diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 15a0bf45..c5c46c5b 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -22,7 +22,9 @@ jobs: if: ${{ !cancelled() && 'pull_request' != github.event_name }} runs-on: ubuntu-latest outputs: + ffmpeg-date: ${{ steps.jq.outputs.FFMPEG_DATE }} ffmpeg-releases: ${{ steps.ffmpeg.outputs.releases }} + ffmpeg-version: ${{ steps.jq.outputs.FFMPEG_VERSION }} lowercase-github-actor: ${{ steps.github-actor.outputs.lowercase }} lowercase-github-repository_owner: ${{ steps.github-repository_owner.outputs.lowercase }} ytdlp-latest-release: ${{ steps.yt-dlp.outputs.latest-release }} @@ -45,43 +47,11 @@ jobs: - name: Retrieve yt-dlp/yt-dlp releases with GitHub CLI id: yt-dlp uses: ./.github/actions/yt-dlp - - test: - if: ${{ !cancelled() && ( 'pull_request' != github.event_name || (! github.event.pull_request.draft) ) }} - runs-on: ubuntu-22.04 - strategy: - fail-fast: false - matrix: - python-version: ['3.8', '3.9', '3.10', '3.11', '3.12'] - steps: - - uses: actions/checkout@v4 - - name: Install Python ${{ matrix.python-version }} - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python-version }} - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install pipenv - pipenv install --system --skip-lock - - name: Set up Django environment - run: | - cp -v -p tubesync/tubesync/local_settings.py.example tubesync/tubesync/local_settings.py - cp -v -a -t "${Python3_ROOT_DIR}"/lib/python3.*/site-packages/background_task/ patches/background_task/* - cp -v -a -t "${Python3_ROOT_DIR}"/lib/python3.*/site-packages/yt_dlp/ patches/yt_dlp/* - - name: Run Django tests - run: cd tubesync && python3 manage.py test --verbosity=2 - - containerise: - if: ${{ !cancelled() && 'success' == needs.info.result }} - needs: ['info', 'test'] - runs-on: ubuntu-latest - timeout-minutes: 120 - steps: - - name: Set environment variables with jq + - name: Set outputs with jq + id: jq run: | cat >| .ffmpeg.releases.json <<'EOF' - ${{ needs.info.outputs.ffmpeg-releases }} + ${{ steps.ffmpeg.outputs.releases }} EOF mk_delim() { local f='%s_EOF_%d_' ; printf -- "${f}" "$1" "${RANDOM}" ; } ; open_ml_var() { local f=''\%'s<<'\%'s\n' ; printf -- "${f}" "$2" "$1" ; } ; @@ -103,9 +73,42 @@ jobs: jq -r --arg date "${ffmpeg_date}" "${jq_arg}" -- .ffmpeg.releases.json ; close_ml_var "${delim}" "${var}" ; unset -v delim jq_arg var ; - } >> "${GITHUB_ENV}" - cat -v "${GITHUB_ENV}" + } >> "${GITHUB_OUTPUT}" + cat -v "${GITHUB_OUTPUT}" rm -v -f .ffmpeg.releases.json + + test: + if: ${{ !cancelled() && ( 'pull_request' != github.event_name || (! github.event.pull_request.draft) ) }} + runs-on: ubuntu-22.04 + strategy: + fail-fast: false + matrix: + python-version: ['3.8', '3.9', '3.10', '3.11', '3.12'] + steps: + - uses: actions/checkout@v4 + - name: Install Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install pipenv + PIPENV_VERBOSITY=64 pipenv install --system --skip-lock + - name: Set up Django environment + run: | + cp -v -p tubesync/tubesync/local_settings.py.example tubesync/tubesync/local_settings.py + cp -v -a -t "${Python3_ROOT_DIR}"/lib/python3.*/site-packages/background_task/ patches/background_task/* + cp -v -a -t "${Python3_ROOT_DIR}"/lib/python3.*/site-packages/yt_dlp/ patches/yt_dlp/* + - name: Run Django tests + run: cd tubesync && python3 manage.py test --verbosity=2 + + containerise: + if: ${{ !cancelled() && 'success' == needs.info.result }} + needs: ['info', 'test'] + runs-on: ubuntu-latest + timeout-minutes: 120 + steps: - name: Set up QEMU uses: docker/setup-qemu-action@v3 - name: Set up Docker Buildx @@ -117,7 +120,31 @@ jobs: DOCKER_USERNAME: ${{ github.actor }} DOCKER_TOKEN: ${{ 'meeb' == github.repository_owner && secrets.REGISTRY_ACCESS_TOKEN || secrets.GITHUB_TOKEN }} run: echo "${DOCKER_TOKEN}" | docker login --password-stdin --username "${DOCKER_USERNAME}" "${DOCKER_REGISTRY}" + - name: Build image for `dive` + id: build-dive-image + uses: docker/build-push-action@v6 + with: + build-args: | + IMAGE_NAME=${{ env.IMAGE_NAME }} + FFMPEG_DATE=${{ needs.info.outputs.ffmpeg-date }} + FFMPEG_VERSION=${{ needs.info.outputs.ffmpeg-version }} + YTDLP_DATE=${{ fromJSON(needs.info.outputs.ytdlp-latest-release).tag.name }} + cache-from: type=gha + load: true + platforms: linux/amd64 + push: false + tags: ghcr.io/${{ needs.info.outputs.lowercase-github-actor }}/${{ env.IMAGE_NAME }}:dive + - name: Analysis with `dive` + run: | + docker run --rm \ + -v /var/run/docker.sock:/var/run/docker.sock \ + 'ghcr.io/wagoodman/dive' \ + 'ghcr.io/${{ needs.info.outputs.lowercase-github-actor }}/${{ env.IMAGE_NAME }}:dive' \ + --ci \ + --highestUserWastedPercent '0.03' \ + --highestWastedBytes '10M' - name: Build and push + id: build-push timeout-minutes: 60 uses: docker/build-push-action@v6 with: @@ -133,6 +160,6 @@ jobs: ${{ 'meeb' == github.repository_owner && 'pull_request' != github.event_name && 'type=inline' || '' }} build-args: | IMAGE_NAME=${{ env.IMAGE_NAME }} - FFMPEG_DATE=${{ env.FFMPEG_DATE }} - FFMPEG_VERSION=${{ env.FFMPEG_VERSION }} + FFMPEG_DATE=${{ needs.info.outputs.ffmpeg-date }} + FFMPEG_VERSION=${{ needs.info.outputs.ffmpeg-version }} YTDLP_DATE=${{ fromJSON(needs.info.outputs.ytdlp-latest-release).tag.name }} diff --git a/Dockerfile b/Dockerfile index 82bc665e..d3169884 100644 --- a/Dockerfile +++ b/Dockerfile @@ -47,7 +47,8 @@ RUN --mount=type=cache,id=apt-lib-cache-${TARGETARCH},sharing=private,target=/va locale-gen en_US.UTF-8 && \ # Clean up apt-get -y autopurge && \ - apt-get -y autoclean + apt-get -y autoclean && \ + rm -f /var/cache/debconf/*.dat-old FROM alpine:${ALPINE_VERSION} AS ffmpeg-download ARG FFMPEG_DATE @@ -289,7 +290,8 @@ RUN --mount=type=cache,id=apt-lib-cache-${TARGETARCH},sharing=private,target=/va useradd -M -d /app -s /bin/false -g app app && \ # Clean up apt-get -y autopurge && \ - apt-get -y autoclean + apt-get -y autoclean && \ + rm -v -f /var/cache/debconf/*.dat-old # Install third party software COPY --from=s6-overlay / / @@ -310,7 +312,8 @@ RUN --mount=type=cache,id=apt-lib-cache-${TARGETARCH},sharing=private,target=/va apt-get -y autoremove --purge file && \ # Clean up apt-get -y autopurge && \ - apt-get -y autoclean + apt-get -y autoclean && \ + rm -v -f /var/cache/debconf/*.dat-old # Switch workdir to the the app WORKDIR /app @@ -362,6 +365,7 @@ RUN --mount=type=tmpfs,target=/cache \ && \ apt-get -y autopurge && \ apt-get -y autoclean && \ + rm -v -f /var/cache/debconf/*.dat-old && \ rm -v -rf /tmp/* # Copy root diff --git a/Pipfile b/Pipfile index bf53b4bf..2976db2e 100644 --- a/Pipfile +++ b/Pipfile @@ -7,7 +7,7 @@ verify_ssl = true autopep8 = "*" [packages] -django = "*" +django = "<5.2" django-sass-processor = {extras = ["management-command"], version = "*"} pillow = "*" whitenoise = "*" diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-worker/dependencies b/config/root/etc/s6-overlay/s6-rc.d/tubesync-db-worker/dependencies similarity index 100% rename from config/root/etc/s6-overlay/s6-rc.d/tubesync-worker/dependencies rename to config/root/etc/s6-overlay/s6-rc.d/tubesync-db-worker/dependencies diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-worker/down-signal b/config/root/etc/s6-overlay/s6-rc.d/tubesync-db-worker/down-signal similarity index 100% rename from config/root/etc/s6-overlay/s6-rc.d/tubesync-worker/down-signal rename to config/root/etc/s6-overlay/s6-rc.d/tubesync-db-worker/down-signal diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-db-worker/run b/config/root/etc/s6-overlay/s6-rc.d/tubesync-db-worker/run new file mode 100755 index 00000000..03b75ea8 --- /dev/null +++ b/config/root/etc/s6-overlay/s6-rc.d/tubesync-db-worker/run @@ -0,0 +1,5 @@ +#!/command/with-contenv bash + +exec nice -n "${TUBESYNC_NICE:-1}" s6-setuidgid app \ + /usr/bin/python3 /app/manage.py process_tasks \ + --queue database diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-worker/type b/config/root/etc/s6-overlay/s6-rc.d/tubesync-db-worker/type similarity index 100% rename from config/root/etc/s6-overlay/s6-rc.d/tubesync-worker/type rename to config/root/etc/s6-overlay/s6-rc.d/tubesync-db-worker/type diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/dependencies b/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/dependencies new file mode 100644 index 00000000..283e1305 --- /dev/null +++ b/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/dependencies @@ -0,0 +1 @@ +gunicorn \ No newline at end of file diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/down-signal b/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/down-signal new file mode 100644 index 00000000..d751378e --- /dev/null +++ b/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/down-signal @@ -0,0 +1 @@ +SIGINT diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/run b/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/run new file mode 100755 index 00000000..0642054d --- /dev/null +++ b/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/run @@ -0,0 +1,5 @@ +#!/command/with-contenv bash + +exec nice -n "${TUBESYNC_NICE:-1}" s6-setuidgid app \ + /usr/bin/python3 /app/manage.py process_tasks \ + --queue filesystem diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/type b/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/type new file mode 100644 index 00000000..1780f9f4 --- /dev/null +++ b/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/type @@ -0,0 +1 @@ +longrun \ No newline at end of file diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/dependencies b/config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/dependencies new file mode 100644 index 00000000..283e1305 --- /dev/null +++ b/config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/dependencies @@ -0,0 +1 @@ +gunicorn \ No newline at end of file diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/down-signal b/config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/down-signal new file mode 100644 index 00000000..d751378e --- /dev/null +++ b/config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/down-signal @@ -0,0 +1 @@ +SIGINT diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-worker/run b/config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/run similarity index 53% rename from config/root/etc/s6-overlay/s6-rc.d/tubesync-worker/run rename to config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/run index b2c3a841..a9c17d49 100755 --- a/config/root/etc/s6-overlay/s6-rc.d/tubesync-worker/run +++ b/config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/run @@ -1,4 +1,5 @@ #!/command/with-contenv bash exec nice -n "${TUBESYNC_NICE:-1}" s6-setuidgid app \ - /usr/bin/python3 /app/manage.py process_tasks + /usr/bin/python3 /app/manage.py process_tasks \ + --queue network diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/type b/config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/type new file mode 100644 index 00000000..1780f9f4 --- /dev/null +++ b/config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/type @@ -0,0 +1 @@ +longrun \ No newline at end of file diff --git a/config/root/etc/s6-overlay/s6-rc.d/user/contents.d/tubesync-worker b/config/root/etc/s6-overlay/s6-rc.d/user/contents.d/tubesync-db-worker similarity index 100% rename from config/root/etc/s6-overlay/s6-rc.d/user/contents.d/tubesync-worker rename to config/root/etc/s6-overlay/s6-rc.d/user/contents.d/tubesync-db-worker diff --git a/config/root/etc/s6-overlay/s6-rc.d/user/contents.d/tubesync-fs-worker b/config/root/etc/s6-overlay/s6-rc.d/user/contents.d/tubesync-fs-worker new file mode 100644 index 00000000..e69de29b diff --git a/config/root/etc/s6-overlay/s6-rc.d/user/contents.d/tubesync-network-worker b/config/root/etc/s6-overlay/s6-rc.d/user/contents.d/tubesync-network-worker new file mode 100644 index 00000000..e69de29b diff --git a/patches/background_task/management/commands/process_tasks.py b/patches/background_task/management/commands/process_tasks.py new file mode 100644 index 00000000..9484c393 --- /dev/null +++ b/patches/background_task/management/commands/process_tasks.py @@ -0,0 +1,124 @@ +# -*- coding: utf-8 -*- +import logging +import random +import sys +import time + +from django import VERSION +from django.core.management.base import BaseCommand +from django.utils import autoreload + +from background_task.tasks import tasks, autodiscover +from background_task.utils import SignalManager +from django.db import close_old_connections as close_connection + + +logger = logging.getLogger(__name__) + + +def _configure_log_std(): + class StdOutWrapper(object): + def write(self, s): + logger.info(s) + + class StdErrWrapper(object): + def write(self, s): + logger.error(s) + sys.stdout = StdOutWrapper() + sys.stderr = StdErrWrapper() + + +class Command(BaseCommand): + help = 'Run tasks that are scheduled to run on the queue' + + # Command options are specified in an abstract way to enable Django < 1.8 compatibility + OPTIONS = ( + (('--duration', ), { + 'action': 'store', + 'dest': 'duration', + 'type': int, + 'default': 0, + 'help': 'Run task for this many seconds (0 or less to run forever) - default is 0', + }), + (('--sleep', ), { + 'action': 'store', + 'dest': 'sleep', + 'type': float, + 'default': 5.0, + 'help': 'Sleep for this many seconds before checking for new tasks (if none were found) - default is 5', + }), + (('--queue', ), { + 'action': 'store', + 'dest': 'queue', + 'help': 'Only process tasks on this named queue', + }), + (('--log-std', ), { + 'action': 'store_true', + 'dest': 'log_std', + 'help': 'Redirect stdout and stderr to the logging system', + }), + (('--dev', ), { + 'action': 'store_true', + 'dest': 'dev', + 'help': 'Auto-reload your code on changes. Use this only for development', + }), + ) + + if VERSION < (1, 8): + from optparse import make_option + option_list = BaseCommand.option_list + tuple([make_option(*args, **kwargs) for args, kwargs in OPTIONS]) + + # Used in Django >= 1.8 + def add_arguments(self, parser): + for (args, kwargs) in self.OPTIONS: + parser.add_argument(*args, **kwargs) + + def __init__(self, *args, **kwargs): + super(Command, self).__init__(*args, **kwargs) + self.sig_manager = None + self._tasks = tasks + + def run(self, *args, **options): + duration = options.get('duration', 0) + sleep = options.get('sleep', 5.0) + queue = options.get('queue', None) + log_std = options.get('log_std', False) + is_dev = options.get('dev', False) + sig_manager = self.sig_manager + + if is_dev: + # raise last Exception is exist + autoreload.raise_last_exception() + + if log_std: + _configure_log_std() + + autodiscover() + + start_time = time.time() + + while (duration <= 0) or (time.time() - start_time) <= duration: + if sig_manager.kill_now: + # shutting down gracefully + break + + if not self._tasks.run_next_task(queue): + # there were no tasks in the queue, let's recover. + close_connection() + logger.debug('waiting for tasks') + time.sleep(sleep) + else: + # there were some tasks to process, let's check if there is more work to do after a little break. + time.sleep(random.uniform(sig_manager.time_to_wait[0], sig_manager.time_to_wait[1])) + self._tasks._pool_runner._pool.close() + + def handle(self, *args, **options): + is_dev = options.get('dev', False) + self.sig_manager = SignalManager() + if is_dev: + reload_func = autoreload.run_with_reloader + if VERSION < (2, 2): + reload_func = autoreload.main + reload_func(self.run, *args, **options) + else: + self.run(*args, **options) diff --git a/tubesync/full_playlist.sh b/tubesync/full_playlist.sh old mode 100644 new mode 100755 diff --git a/tubesync/sync/choices.py b/tubesync/sync/choices.py index c67de54b..25dd762a 100644 --- a/tubesync/sync/choices.py +++ b/tubesync/sync/choices.py @@ -160,6 +160,12 @@ class SponsorBlock_Category(models.TextChoices): MUSIC_OFFTOPIC = 'music_offtopic', _( 'Non-Music Section' ) +class TaskQueue(models.TextChoices): + DB = 'database', _('Database') + FS = 'filesystem', _('Filesystem') + NET = 'network', _('Networking') + + class YouTube_SourceType(models.TextChoices): CHANNEL = 'c', _('YouTube channel') CHANNEL_ID = 'i', _('YouTube channel by ID') diff --git a/tubesync/sync/management/commands/delete-source.py b/tubesync/sync/management/commands/delete-source.py index 206aee7f..5ab8a325 100644 --- a/tubesync/sync/management/commands/delete-source.py +++ b/tubesync/sync/management/commands/delete-source.py @@ -6,7 +6,7 @@ from django.db.models import signals from common.logger import log from sync.models import Source, Media, MediaServer from sync.signals import media_post_delete -from sync.tasks import rescan_media_server +from sync.tasks import schedule_media_servers_update class Command(BaseCommand): @@ -37,15 +37,6 @@ class Command(BaseCommand): log.info(f'Source directory: {source.directory_path}') source.delete() # Update any media servers - for mediaserver in MediaServer.objects.all(): - log.info(f'Scheduling media server updates') - verbose_name = _('Request media server rescan for "{}"') - rescan_media_server( - str(mediaserver.pk), - priority=0, - schedule=30, - verbose_name=verbose_name.format(mediaserver), - remove_existing_tasks=True - ) + schedule_media_servers_update() # All done log.info('Done') diff --git a/tubesync/sync/management/commands/reset-tasks.py b/tubesync/sync/management/commands/reset-tasks.py index 7d78c09f..d7818007 100644 --- a/tubesync/sync/management/commands/reset-tasks.py +++ b/tubesync/sync/management/commands/reset-tasks.py @@ -1,4 +1,5 @@ from django.core.management.base import BaseCommand, CommandError +from django.db.transaction import atomic from django.utils.translation import gettext_lazy as _ from background_task.models import Task from sync.models import Source @@ -12,6 +13,7 @@ class Command(BaseCommand): help = 'Resets all tasks' + @atomic(durable=True) def handle(self, *args, **options): log.info('Resettings all tasks...') # Delete all tasks @@ -24,8 +26,6 @@ class Command(BaseCommand): index_source_task( str(source.pk), repeat=source.index_schedule, - queue=str(source.pk), - priority=10, verbose_name=verbose_name.format(source.name) ) # This also chains down to call each Media objects .save() as well diff --git a/tubesync/sync/matching.py b/tubesync/sync/matching.py index 9390e6fa..ffb86416 100644 --- a/tubesync/sync/matching.py +++ b/tubesync/sync/matching.py @@ -236,7 +236,7 @@ def get_best_video_format(media): break if not best_match: for fmt in video_formats: - # Check for codec and resolution match bot drop 60fps + # Check for codec and resolution match but drop 60fps if (source_resolution == fmt['format'] and source_vcodec == fmt['vcodec'] and not fmt['is_hdr']): @@ -294,7 +294,7 @@ def get_best_video_format(media): break if not best_match: for fmt in video_formats: - # Check for codec and resolution match bot drop hdr + # Check for codec and resolution match but drop hdr if (source_resolution == fmt['format'] and source_vcodec == fmt['vcodec'] and not fmt['is_60fps']): diff --git a/tubesync/sync/models.py b/tubesync/sync/models.py index 802356e8..c3424bd6 100644 --- a/tubesync/sync/models.py +++ b/tubesync/sync/models.py @@ -237,7 +237,7 @@ class Source(models.Model): _('source video codec'), max_length=8, db_index=True, - choices=list(reversed(YouTube_VideoCodec.choices[1:])), + choices=list(reversed(YouTube_VideoCodec.choices)), default=YouTube_VideoCodec.VP9, help_text=_('Source video codec, desired video encoding format to download (ignored if "resolution" is audio only)') ) diff --git a/tubesync/sync/signals.py b/tubesync/sync/signals.py index be848a0a..6ee64747 100644 --- a/tubesync/sync/signals.py +++ b/tubesync/sync/signals.py @@ -30,6 +30,8 @@ def source_pre_save(sender, instance, **kwargs): log.debug(f'source_pre_save signal: no existing source: {sender} - {instance}') return + args = ( str(instance.pk), ) + check_source_directory_exists.now(*args) existing_dirpath = existing_source.directory_path.resolve(strict=True) new_dirpath = instance.directory_path.resolve(strict=False) if existing_dirpath != new_dirpath: @@ -90,12 +92,9 @@ def source_pre_save(sender, instance, **kwargs): verbose_name = _('Index media from source "{}"') index_source_task( str(instance.pk), - schedule=instance.index_schedule, repeat=instance.index_schedule, - queue=str(instance.pk), - priority=10, + schedule=instance.index_schedule, verbose_name=verbose_name.format(instance.name), - remove_existing_tasks=True ) @@ -106,14 +105,12 @@ def source_post_save(sender, instance, created, **kwargs): verbose_name = _('Check download directory exists for source "{}"') check_source_directory_exists( str(instance.pk), - priority=0, - verbose_name=verbose_name.format(instance.name) + verbose_name=verbose_name.format(instance.name), ) if instance.source_type != Val(YouTube_SourceType.PLAYLIST) and instance.copy_channel_images: download_source_images( str(instance.pk), - priority=5, - verbose_name=verbose_name.format(instance.name) + verbose_name=verbose_name.format(instance.name), ) if instance.index_schedule > 0: delete_task_by_source('sync.tasks.index_source_task', instance.pk) @@ -121,20 +118,15 @@ def source_post_save(sender, instance, created, **kwargs): verbose_name = _('Index media from source "{}"') index_source_task( str(instance.pk), - schedule=600, repeat=instance.index_schedule, - queue=str(instance.pk), - priority=10, + schedule=600, verbose_name=verbose_name.format(instance.name), - remove_existing_tasks=True ) verbose_name = _('Checking all media for source "{}"') save_all_media_for_source( str(instance.pk), - priority=25, verbose_name=verbose_name.format(instance.name), - remove_existing_tasks=True ) @@ -155,7 +147,6 @@ def source_pre_delete(sender, instance, **kwargs): delete_all_media_for_source( str(instance.pk), str(instance.name), - priority=1, verbose_name=verbose_name.format(instance.name), ) # Try to do it all immediately @@ -242,10 +233,7 @@ def media_post_save(sender, instance, created, **kwargs): verbose_name = _('Renaming media for: {}: "{}"') rename_media( str(media.pk), - queue=str(media.pk), - priority=20, verbose_name=verbose_name.format(media.key, media.name), - remove_existing_tasks=True ) # If the media is missing metadata schedule it to be downloaded @@ -254,9 +242,7 @@ def media_post_save(sender, instance, created, **kwargs): verbose_name = _('Downloading metadata for "{}"') download_media_metadata( str(instance.pk), - priority=20, verbose_name=verbose_name.format(instance.pk), - remove_existing_tasks=True ) # If the media is missing a thumbnail schedule it to be downloaded (unless we are skipping this media) if not instance.thumb_file_exists: @@ -270,10 +256,7 @@ def media_post_save(sender, instance, created, **kwargs): download_media_thumbnail( str(instance.pk), thumbnail_url, - queue=str(instance.source.pk), - priority=15, verbose_name=verbose_name.format(instance.name), - remove_existing_tasks=True ) # If the media has not yet been downloaded schedule it to be downloaded if not (instance.media_file_exists or instance.filepath.exists() or existing_media_download_task): @@ -287,10 +270,7 @@ def media_post_save(sender, instance, created, **kwargs): verbose_name = _('Downloading media for "{}"') download_media( str(instance.pk), - queue=str(instance.source.pk), - priority=15, verbose_name=verbose_name.format(instance.name), - remove_existing_tasks=True ) # Save the instance if any changes were required if skip_changed or can_download_changed: diff --git a/tubesync/sync/tasks.py b/tubesync/sync/tasks.py index 6cf0fc2d..34c37e6c 100644 --- a/tubesync/sync/tasks.py +++ b/tubesync/sync/tasks.py @@ -21,10 +21,12 @@ from django.db.transaction import atomic from django.utils import timezone from django.utils.translation import gettext_lazy as _ from background_task import background +from background_task.exceptions import InvalidTaskError from background_task.models import Task, CompletedTask from common.logger import log from common.errors import NoMediaException, NoMetadataException, DownloadFailedException from common.utils import json_serial, remove_enclosed +from .choices import Val, TaskQueue from .models import Source, Media, MediaServer from .utils import (get_remote_image, resize_image_to_height, delete_file, write_text_file, filter_response) @@ -123,7 +125,8 @@ def update_task_status(task, status): except DatabaseError as e: if 'Save with update_fields did not affect any rows.' == str(e): pass - raise + else: + raise return True @@ -131,11 +134,12 @@ def get_source_completed_tasks(source_id, only_errors=False): ''' Returns a queryset of CompletedTask objects for a source by source ID. ''' - q = {'queue': source_id} + q = {'task_params__istartswith': f'[["{source_id}"'} if only_errors: q['failed_at__isnull'] = False return CompletedTask.objects.filter(**q).order_by('-failed_at') + def get_tasks(task_name, id=None, /, instance=None): assert not (id is None and instance is None) arg = str(id or instance.pk) @@ -160,10 +164,15 @@ def get_source_check_task(source_id): def get_source_index_task(source_id): return get_first_task('sync.tasks.index_source_task', source_id) + def delete_task_by_source(task_name, source_id): now = timezone.now() unlocked = Task.objects.unlocked(now) - return unlocked.filter(task_name=task_name, queue=str(source_id)).delete() + qs = unlocked.filter( + task_name=task_name, + task_params__istartswith=f'[["{source_id}"', + ) + return qs.delete() def delete_task_by_media(task_name, args): @@ -183,6 +192,13 @@ def cleanup_completed_tasks(): CompletedTask.objects.filter(run_at__lt=delta).delete() +@atomic(durable=False) +def migrate_queues(): + tqs = Task.objects.all() + qs = tqs.exclude(queue__in=TaskQueue.values) + return qs.update(queue=Val(TaskQueue.NET)) + + def schedule_media_servers_update(): with atomic(): # Schedule a task to update media servers @@ -191,7 +207,7 @@ def schedule_media_servers_update(): for mediaserver in MediaServer.objects.all(): rescan_media_server( str(mediaserver.pk), - priority=30, + priority=10, verbose_name=verbose_name.format(mediaserver), remove_existing_tasks=True, ) @@ -225,7 +241,7 @@ def cleanup_removed_media(source, videos): schedule_media_servers_update() -@background(schedule=300, remove_existing_tasks=True) +@background(schedule=dict(priority=10, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True) def index_source_task(source_id): ''' Indexes media available from a Source object. @@ -235,18 +251,20 @@ def index_source_task(source_id): cleanup_old_media() try: source = Source.objects.get(pk=source_id) - except Source.DoesNotExist: + except Source.DoesNotExist as e: # Task triggered but the Source has been deleted, delete the task - return + raise InvalidTaskError(_('no such source')) from e # An inactive Source would return an empty list for videos anyway if not source.is_active: return # Reset any errors + # TODO: determine if this affects anything source.has_failed = False source.save() # Index the source videos = source.index_media() if not videos: + # TODO: Record this error in source.has_failed ? raise NoMediaException(f'Source "{source}" (ID: {source_id}) returned no ' f'media to index, is the source key valid? Check the ' f'source configuration is correct and that the source ' @@ -310,7 +328,7 @@ def index_source_task(source_id): cleanup_removed_media(source, videos) -@background(schedule=0) +@background(schedule=dict(priority=0, run_at=0), queue=Val(TaskQueue.FS)) def check_source_directory_exists(source_id): ''' Checks the output directory for a source exists and is writable, if it does @@ -319,17 +337,17 @@ def check_source_directory_exists(source_id): ''' try: source = Source.objects.get(pk=source_id) - except Source.DoesNotExist: + except Source.DoesNotExist as e: # Task triggered but the Source has been deleted, delete the task - return + raise InvalidTaskError(_('no such source')) from e # Check the source output directory exists if not source.directory_exists(): - # Try and create it + # Try to create it log.info(f'Creating directory: {source.directory_path}') source.make_directory() -@background(schedule=0) +@background(schedule=dict(priority=5, run_at=10), queue=Val(TaskQueue.NET)) def download_source_images(source_id): ''' Downloads an image and save it as a local thumbnail attached to a @@ -337,11 +355,11 @@ def download_source_images(source_id): ''' try: source = Source.objects.get(pk=source_id) - except Source.DoesNotExist: + except Source.DoesNotExist as e: # Task triggered but the source no longer exists, do nothing log.error(f'Task download_source_images(pk={source_id}) called but no ' f'source exists with ID: {source_id}') - return + raise InvalidTaskError(_('no such source')) from e avatar, banner = source.get_image_url log.info(f'Thumbnail URL for source with ID: {source_id} / {source} ' f'Avatar: {avatar} ' @@ -379,18 +397,18 @@ def download_source_images(source_id): log.info(f'Thumbnail downloaded for source with ID: {source_id} / {source}') -@background(schedule=60, remove_existing_tasks=True) +@background(schedule=dict(priority=20, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True) def download_media_metadata(media_id): ''' Downloads the metadata for a media item. ''' try: media = Media.objects.get(pk=media_id) - except Media.DoesNotExist: + except Media.DoesNotExist as e: # Task triggered but the media no longer exists, do nothing log.error(f'Task download_media_metadata(pk={media_id}) called but no ' f'media exists with ID: {media_id}') - return + raise InvalidTaskError(_('no such media')) from e if media.manual_skip: log.info(f'Task for ID: {media_id} / {media} skipped, due to task being manually skipped.') return @@ -428,12 +446,9 @@ def download_media_metadata(media_id): verbose_name = _('Waiting for the premiere of "{}" at: {}') wait_for_media_premiere( str(media.pk), - priority=0, - queue=str(media.pk), repeat=Task.HOURLY, repeat_until = published_datetime + timedelta(hours=1), verbose_name=verbose_name.format(media.key, published_datetime.isoformat(' ', 'seconds')), - remove_existing_tasks=True, ) raise_exception = False if raise_exception: @@ -466,7 +481,7 @@ def download_media_metadata(media_id): f'{source} / {media}: {media_id}') -@background(schedule=60, remove_existing_tasks=True) +@background(schedule=dict(priority=15, run_at=10), queue=Val(TaskQueue.NET), remove_existing_tasks=True) def download_media_thumbnail(media_id, url): ''' Downloads an image from a URL and save it as a local thumbnail attached to a @@ -474,10 +489,10 @@ def download_media_thumbnail(media_id, url): ''' try: media = Media.objects.get(pk=media_id) - except Media.DoesNotExist: + except Media.DoesNotExist as e: # Task triggered but the media no longer exists, do nothing - return - if media.skip: + raise InvalidTaskError(_('no such media')) from e + if media.skip or media.manual_skip: # Media was toggled to be skipped after the task was scheduled log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but ' f'it is now marked to be skipped, not downloading thumbnail') @@ -504,38 +519,43 @@ def download_media_thumbnail(media_id, url): return True -@background(schedule=60, remove_existing_tasks=True) +@background(schedule=dict(priority=15, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True) def download_media(media_id): ''' Downloads the media to disk and attaches it to the Media instance. ''' try: media = Media.objects.get(pk=media_id) - except Media.DoesNotExist: + except Media.DoesNotExist as e: # Task triggered but the media no longer exists, do nothing - return - if not media.has_metadata: - raise NoMetadataException('Metadata is not yet available.') - if media.skip: - # Media was toggled to be skipped after the task was scheduled - log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but ' - f'it is now marked to be skipped, not downloading') - return - downloaded_file_exists = ( - media.media_file_exists or - media.filepath.exists() - ) - if media.downloaded and downloaded_file_exists: - # Media has been marked as downloaded before the download_media task was fired, - # skip it - log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but ' - f'it has already been marked as downloaded, not downloading again') - return + raise InvalidTaskError(_('no such media')) from e if not media.source.download_media: log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but ' f'the source {media.source} has since been marked to not download, ' f'not downloading') return + if media.skip or media.manual_skip: + # Media was toggled to be skipped after the task was scheduled + log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but ' + f'it is now marked to be skipped, not downloading') + return + # metadata is required to generate the proper filepath + if not media.has_metadata: + raise NoMetadataException('Metadata is not yet available.') + downloaded_file_exists = ( + media.downloaded and + media.has_metadata and + ( + media.media_file_exists or + media.filepath.exists() + ) + ) + if downloaded_file_exists: + # Media has been marked as downloaded before the download_media task was fired, + # skip it + log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but ' + f'it has already been marked as downloaded, not downloading again') + return max_cap_age = media.source.download_cap_date published = media.published if max_cap_age and published: @@ -608,16 +628,7 @@ def download_media(media_id): log.warn(f'A permissions problem occured when writing the new media NFO file: {e.msg}') pass # Schedule a task to update media servers - for mediaserver in MediaServer.objects.all(): - log.info(f'Scheduling media server updates') - verbose_name = _('Request media server rescan for "{}"') - rescan_media_server( - str(mediaserver.pk), - queue=str(media.source.pk), - priority=0, - verbose_name=verbose_name.format(mediaserver), - remove_existing_tasks=True - ) + schedule_media_servers_update() else: # Expected file doesn't exist on disk err = (f'Failed to download media: {media} (UUID: {media.pk}) to disk, ' @@ -630,22 +641,22 @@ def download_media(media_id): raise DownloadFailedException(err) -@background(schedule=300, remove_existing_tasks=True) +@background(schedule=dict(priority=0, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True) def rescan_media_server(mediaserver_id): ''' Attempts to request a media rescan on a remote media server. ''' try: mediaserver = MediaServer.objects.get(pk=mediaserver_id) - except MediaServer.DoesNotExist: + except MediaServer.DoesNotExist as e: # Task triggered but the media server no longer exists, do nothing - return + raise InvalidTaskError(_('no such server')) from e # Request an rescan / update log.info(f'Updating media server: {mediaserver}') mediaserver.update() -@background(schedule=300, remove_existing_tasks=True) +@background(schedule=dict(priority=25, run_at=600), queue=Val(TaskQueue.NET), remove_existing_tasks=True) def save_all_media_for_source(source_id): ''' Iterates all media items linked to a source and saves them to @@ -655,11 +666,11 @@ def save_all_media_for_source(source_id): ''' try: source = Source.objects.get(pk=source_id) - except Source.DoesNotExist: + except Source.DoesNotExist as e: # Task triggered but the source no longer exists, do nothing log.error(f'Task save_all_media_for_source(pk={source_id}) called but no ' f'source exists with ID: {source_id}') - return + raise InvalidTaskError(_('no such source')) from e already_saved = set() mqs = Media.objects.filter(source=source) @@ -694,41 +705,41 @@ def save_all_media_for_source(source_id): # flags may need to be recalculated tvn_format = '2/{:,}' + f'/{mqs.count():,}' for mn, media in enumerate(mqs, start=1): - update_task_status(task, tvn_format.format(mn)) if media.uuid not in already_saved: + update_task_status(task, tvn_format.format(mn)) with atomic(): media.save() # Reset task.verbose_name to the saved value update_task_status(task, None) -@background(schedule=60, remove_existing_tasks=True) +@background(schedule=dict(priority=20, run_at=60), queue=Val(TaskQueue.FS), remove_existing_tasks=True) def rename_media(media_id): try: media = Media.objects.defer('metadata', 'thumb').get(pk=media_id) - except Media.DoesNotExist: - return + except Media.DoesNotExist as e: + raise InvalidTaskError(_('no such media')) from e media.rename_files() -@background(schedule=300, remove_existing_tasks=True) +@background(schedule=dict(priority=20, run_at=300), queue=Val(TaskQueue.FS), remove_existing_tasks=True) @atomic(durable=True) def rename_all_media_for_source(source_id): try: source = Source.objects.get(pk=source_id) - except Source.DoesNotExist: + except Source.DoesNotExist as e: # Task triggered but the source no longer exists, do nothing log.error(f'Task rename_all_media_for_source(pk={source_id}) called but no ' f'source exists with ID: {source_id}') - return + raise InvalidTaskError(_('no such source')) from e # Check that the settings allow renaming - rename_sources_setting = settings.RENAME_SOURCES or list() + rename_sources_setting = getattr(settings, 'RENAME_SOURCES', list()) create_rename_tasks = ( ( source.directory and source.directory in rename_sources_setting ) or - settings.RENAME_ALL_SOURCES + getattr(settings, 'RENAME_ALL_SOURCES', False) ) if not create_rename_tasks: return @@ -744,15 +755,15 @@ def rename_all_media_for_source(source_id): media.rename_files() -@background(schedule=60, remove_existing_tasks=True) +@background(schedule=dict(priority=0, run_at=60), queue=Val(TaskQueue.DB), remove_existing_tasks=True) def wait_for_media_premiere(media_id): hours = lambda td: 1+int((24*td.days)+(td.seconds/(60*60))) try: media = Media.objects.get(pk=media_id) - except Media.DoesNotExist: - return - if media.metadata: + except Media.DoesNotExist as e: + raise InvalidTaskError(_('no such media')) from e + if media.has_metadata: return now = timezone.now() if media.published < now: @@ -764,17 +775,20 @@ def wait_for_media_premiere(media_id): media.manual_skip = True media.title = _(f'Premieres in {hours(media.published - now)} hours') media.save() + task = get_media_premiere_task(media_id) + if task: + update_task_status(task, f'available in {hours(media.published - now)} hours') -@background(schedule=300, remove_existing_tasks=False) +@background(schedule=dict(priority=1, run_at=300), queue=Val(TaskQueue.FS), remove_existing_tasks=False) def delete_all_media_for_source(source_id, source_name): source = None try: source = Source.objects.get(pk=source_id) - except Source.DoesNotExist: + except Source.DoesNotExist as e: # Task triggered but the source no longer exists, do nothing log.error(f'Task delete_all_media_for_source(pk={source_id}) called but no ' f'source exists with ID: {source_id}') - pass + raise InvalidTaskError(_('no such source')) from e mqs = Media.objects.all().defer( 'metadata', ).filter( diff --git a/tubesync/sync/templates/sync/tasks-completed.html b/tubesync/sync/templates/sync/tasks-completed.html index b87805be..52f576df 100644 --- a/tubesync/sync/templates/sync/tasks-completed.html +++ b/tubesync/sync/templates/sync/tasks-completed.html @@ -17,14 +17,14 @@ {% if task.has_error %} {{ task.verbose_name }}
- Source: "{{ task.queue }}"
+ Queue: "{{ task.queue }}"
Error: "{{ task.error_message }}"
Task ran at {{ task.run_at|date:'Y-m-d H:i:s' }}
{% else %} {{ task.verbose_name }}
- Source: "{{ task.queue }}"
+ Queue: "{{ task.queue }}"
Task ran at {{ task.run_at|date:'Y-m-d H:i:s' }}
{% endif %} diff --git a/tubesync/sync/tests.py b/tubesync/sync/tests.py index 514f75b1..303aa18a 100644 --- a/tubesync/sync/tests.py +++ b/tubesync/sync/tests.py @@ -20,7 +20,7 @@ from .tasks import cleanup_old_media, check_source_directory_exists from .filtering import filter_media from .utils import filter_response from .choices import (Val, Fallback, IndexSchedule, SourceResolution, - YouTube_AudioCodec, YouTube_VideoCodec, + TaskQueue, YouTube_AudioCodec, YouTube_VideoCodec, YouTube_SourceType, youtube_long_source_types) @@ -138,7 +138,7 @@ class FrontEndTestCase(TestCase): else: # Invalid source tests should reload the page with an error self.assertEqual(response.status_code, 200) - self.assertIn('