diff --git a/.github/actions/FFmpeg/action.yml b/.github/actions/FFmpeg/action.yml new file mode 100644 index 00000000..c768592b --- /dev/null +++ b/.github/actions/FFmpeg/action.yml @@ -0,0 +1,81 @@ +name: 'FFmpeg Builds' +description: 'Use GitHub CLI & API to retrieve information about FFmpeg Build releases.' + +inputs: + token: + required: true + default: ${{ github.token }} + description: | + GH_TOKEN for GitHub CLI to use. + Default: `\$\{\{ github.token \}\}` + num-assets: + required: true + default: '25' + description: | + The number of assets (attached files) to retrieve from each release. + Default: 25 + num-releases: + required: true + default: '35' + description: | + The number of releases to retrieve from the repository. + Default: 35 + repository_owner: + required: true + default: 'yt-dlp' + description: | + The name of the user or organization that owns the repository. + Default: 'yt-dlp' + repository_name: + required: true + default: 'FFmpeg-Builds' + description: | + Which repository from the owner to search for releases. + Default: 'FFmpeg-Builds' + +outputs: + releases: + value: ${{ steps.set.outputs.releases }} + description: 'Generated JSON describing the released builds.' + +runs: + using: 'composite' + steps: + - name: Retrieve releases + id: 'set' + env: + GH_TOKEN: ${{ inputs.token }} + GH_REPO: '${{ inputs.repository_owner }}/${{ inputs.repository_name }}' + GH_API_GQL_ASSETS: '${{ inputs.num-assets }}' + GH_API_GQL_RELEASES: '${{ inputs.num-releases }}' + GH_API_GQL_OWNER: '${{ inputs.repository_owner }}' + GH_API_GQL_REPO: '${{ inputs.repository_name }}' + shell: 'bash' + run: | + command -v gh >/dev/null ; + command -v jq >/dev/null ; + gql_query='query($repo: String!, $owner: String!, $releases: Int!, $assets: Int!) { repository(owner: $owner, name: $repo) { releases(first: $releases, orderBy: { field: CREATED_AT, direction: DESC }) { nodes { tagName, isDraft, isPrerelease, isLatest, tag { name, target { oid, commitUrl } }, releaseAssets(first: $assets) { totalCount, nodes { name, size, downloadUrl } } } } } }' ; + gql_jq='[ .data.repository.releases.nodes[] | select((.isLatest or .isDraft or .isPrerelease) | not) | { "tag": .tag.name, "commit": .tag.target.oid, "date": .tag.name[1+(.tag.name|index("-")):], "assets": { "limit": '"${GH_API_GQL_ASSETS}"', "totalCount": .releaseAssets.totalCount }, "files": .releaseAssets.nodes, "versions": [ .releaseAssets.nodes[].name | select(contains("-linux64-"))[1+index("-"):index("-linux64-")] ] } ]' ; + mk_delim() { printf -- '"%s_EOF_%d_"' "$1" "${RANDOM}" ; } ; + open_ml_var() { local f=''\%'s<<'\%'s\n' ; printf -- "${f}" "$2" "$1" ; } ; + close_ml_var() { local f='%s\n' ; printf -- "${f}" "$1" ; } ; + { + var='releases' ; + delim="$(mk_delim "${var}")" ; + open_ml_var "${delim}" "${var}" ; + gh api graphql --cache 12h \ + -F assets="${GH_API_GQL_ASSETS}" \ + -F owner="${GH_API_GQL_OWNER}" \ + -F repo="${GH_API_GQL_REPO}" \ + -F releases="${GH_API_GQL_RELEASES}" \ + -f query="${gql_query}" --jq "${gql_jq}" ; + close_ml_var "${delim}" "${var}" ; + unset -v delim jq_arg var ; + } >> "${GITHUB_OUTPUT}" ; + # Log the human version + gh api graphql --cache 12h \ + -F assets="${GH_API_GQL_ASSETS}" \ + -F owner="${GH_API_GQL_OWNER}" \ + -F repo="${GH_API_GQL_REPO}" \ + -F releases="${GH_API_GQL_RELEASES}" \ + -f query="${gql_query}" --jq "${gql_jq}" | jq '.[]' -- ; diff --git a/.github/actions/string-case/action.yml b/.github/actions/string-case/action.yml new file mode 100644 index 00000000..cadcb74b --- /dev/null +++ b/.github/actions/string-case/action.yml @@ -0,0 +1,57 @@ +name: Change String Case +description: Make a string lowercase, uppercase, or capitalized + +inputs: + string: + description: The input string + required: true + +outputs: + lowercase: + value: ${{ steps.set.outputs.lowercase }} + description: The input string, with any uppercase characters replaced with lowercase ones + uppercase: + value: ${{ steps.set.outputs.uppercase }} + description: The input string, with any lowercase characters replaced with uppercase ones + capitalized: + value: ${{ steps.set.outputs.capitalized }} + description: The input string, with any alphabetical characters lowercase, except for the first character, which is uppercased + +runs: + using: 'composite' + steps: + - name: Set outputs + id: 'set' + env: + INPUT_STRING: '${{ inputs.string }}' + shell: 'bash' + run: | + printf -- 'Manipulating string: %s\n' "${INPUT_STRING}" + set_sl_var() { local f='%s=%s\n' ; printf -- "${f}" "$@" ; } ; + mk_delim() { printf -- '"%s_EOF_%d_"' "$1" "${RANDOM}" ; } ; + open_ml_var() { local f=''\%'s<<'\%'s\n' ; printf -- "${f}" "$2" "$1" ; } ; + close_ml_var() { local f='%s\n' ; printf -- "${f}" "$1" ; } ; + { + + var='lowercase' ; + delim="$(mk_delim "${var}")" ; + open_ml_var "${delim}" "${var}" ; + printf -- '%s\n' "${INPUT_STRING,,}" ; + close_ml_var "${delim}" "${var}" ; + + var='capitalized' ; + delim="$(mk_delim "${var}")" ; + open_ml_var "${delim}" "${var}" ; + printf -- '%s\n' "${INPUT_STRING^}" ; + close_ml_var "${delim}" "${var}" ; + + var='uppercase' ; + delim="$(mk_delim "${var}")" ; + open_ml_var "${delim}" "${var}" ; + printf -- '%s\n' "${INPUT_STRING^^}" ; + close_ml_var "${delim}" "${var}" ; + + } >> "${GITHUB_OUTPUT}" + printf -- '%s: %s\n' 'lowercase' "${INPUT_STRING,,}" + printf -- '%s: %s\n' 'uppercase' "${INPUT_STRING^^}" + printf -- '%s: %s\n' 'capitalized' "${INPUT_STRING^}" diff --git a/.github/actions/yt-dlp/action.yml b/.github/actions/yt-dlp/action.yml new file mode 100644 index 00000000..5f8b37ff --- /dev/null +++ b/.github/actions/yt-dlp/action.yml @@ -0,0 +1,84 @@ +name: 'yt-dlp Releases' +description: 'Use GitHub CLI & API to retrieve information about `yt-dlp` releases.' + +inputs: + token: + required: true + default: ${{ github.token }} + description: | + GH_TOKEN for GitHub CLI to use. + Default: `\$\{\{ github.token \}\}` + num-releases: + required: true + default: '25' + description: | + The number of releases to retrieve from the repository. + Default: 25 + repository_owner: + required: true + default: 'yt-dlp' + description: | + The name of the user or organization that owns the repository. + Default: 'yt-dlp' + repository_name: + required: true + default: 'yt-dlp' + description: | + Which repository from the owner to search for releases. + Default: 'yt-dlp' + +outputs: + latest-release: + value: ${{ steps.set.outputs.latest-release }} + description: 'The JSON API response for the latest release.' + releases: + value: ${{ steps.set.outputs.releases }} + description: 'Retrieved JSON from the API describing the releases.' + +runs: + using: 'composite' + steps: + - name: Retrieve releases + id: 'set' + env: + GH_TOKEN: ${{ inputs.token }} + GH_API_GQL_RELEASES: '${{ inputs.num-releases }}' + GH_API_GQL_OWNER: '${{ inputs.repository_owner }}' + GH_API_GQL_REPO: '${{ inputs.repository_name }}' + shell: 'bash' + run: | + command -v gh > /dev/null ; + command -v jq > /dev/null ; + gql_query='query($repo: String!, $owner: String!, $releases: Int!) { repository(owner: $owner, name: $repo) { releases(first: $releases, orderBy: { field: CREATED_AT, direction: DESC }) { nodes { name, createdAt, publishedAt, updatedAt, tagName, url, isDraft, isPrerelease, isLatest, tag { name, target { oid, commitUrl } } } } } }' ; + gql_jq='[ .data.repository.releases.nodes[] | select((.isDraft or .isPrerelease) | not) | del(.isDraft, .isPrerelease) ]' ; + mk_delim() { printf -- '"%s_EOF_%d_"' "$1" "${RANDOM}" ; } ; + open_ml_var() { local f=''\%'s<<'\%'s\n' ; printf -- "${f}" "$2" "$1" ; } ; + close_ml_var() { local f='%s\n' ; printf -- "${f}" "$1" ; } ; + { + var='releases' ; + delim="$(mk_delim "${var}")" ; + open_ml_var "${delim}" "${var}" ; + gh api graphql --cache 12h \ + -F owner="${GH_API_GQL_OWNER}" \ + -F repo="${GH_API_GQL_REPO}" \ + -F releases="${GH_API_GQL_RELEASES}" \ + -f query="${gql_query}" --jq "${gql_jq}" ; + close_ml_var "${delim}" "${var}" ; + jq_arg='map(select(.isLatest))[0]' ; + var='latest-release' ; + delim="$(mk_delim "${var}")" ; + open_ml_var "${delim}" "${var}" ; + gh api graphql --cache 12h \ + -F owner="${GH_API_GQL_OWNER}" \ + -F repo="${GH_API_GQL_REPO}" \ + -F releases="${GH_API_GQL_RELEASES}" \ + -f query="${gql_query}" --jq "${gql_jq}" | jq -c "${jq_arg}" -- ; + close_ml_var "${delim}" "${var}" ; + unset -v delim jq_arg var ; + } >> "${GITHUB_OUTPUT}" ; + # Log the human version + gh api graphql --cache 12h \ + -F owner="${GH_API_GQL_OWNER}" \ + -F repo="${GH_API_GQL_REPO}" \ + -F releases="${GH_API_GQL_RELEASES}" \ + -f query="${gql_query}" --jq "${gql_jq}" | jq '.[]' -- ; diff --git a/.github/sh/library/variables.inc.sh b/.github/sh/library/variables.inc.sh new file mode 100644 index 00000000..01a5373c --- /dev/null +++ b/.github/sh/library/variables.inc.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env sh + +# For setting single line variables in the environment or output +set_sl_var() { local f='%s=%s\n' ; printf -- "${f}" "$@" ; } ; + +# Used together to set multiple line variables in the environment or output +mk_delim() { local f='%s_EOF_%d_' ; printf -- "${f}" "$1" "${RANDOM}" ; } ; +open_ml_var() { local f=''\%'s<<'\%'s\n' ; printf -- "${f}" "$2" "$1" ; } ; +close_ml_var() { local f='%s\n' ; printf -- "${f}" "$1" ; } ; + diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index faf25319..c5c46c5b 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -1,4 +1,4 @@ -name: Run Django tests for TubeSync +name: CI env: IMAGE_NAME: tubesync @@ -8,14 +8,82 @@ on: push: branches: - main + pull_request: + branches: + - main + types: + - opened + - reopened + - synchronize + - ready_for_review jobs: + info: + if: ${{ !cancelled() && 'pull_request' != github.event_name }} + runs-on: ubuntu-latest + outputs: + ffmpeg-date: ${{ steps.jq.outputs.FFMPEG_DATE }} + ffmpeg-releases: ${{ steps.ffmpeg.outputs.releases }} + ffmpeg-version: ${{ steps.jq.outputs.FFMPEG_VERSION }} + lowercase-github-actor: ${{ steps.github-actor.outputs.lowercase }} + lowercase-github-repository_owner: ${{ steps.github-repository_owner.outputs.lowercase }} + ytdlp-latest-release: ${{ steps.yt-dlp.outputs.latest-release }} + ytdlp-releases: ${{ steps.yt-dlp.outputs.releases }} + steps: + - uses: actions/checkout@v4 + - name: Lowercase github username + id: github-actor + uses: ./.github/actions/string-case + with: + string: ${{ github.actor }} + - name: Lowercase github repository owner + id: github-repository_owner + uses: ./.github/actions/string-case + with: + string: ${{ github.repository_owner }} + - name: Retrieve yt-dlp/FFmpeg-Builds releases with GitHub CLI + id: ffmpeg + uses: ./.github/actions/FFmpeg + - name: Retrieve yt-dlp/yt-dlp releases with GitHub CLI + id: yt-dlp + uses: ./.github/actions/yt-dlp + - name: Set outputs with jq + id: jq + run: | + cat >| .ffmpeg.releases.json <<'EOF' + ${{ steps.ffmpeg.outputs.releases }} + EOF + mk_delim() { local f='%s_EOF_%d_' ; printf -- "${f}" "$1" "${RANDOM}" ; } ; + open_ml_var() { local f=''\%'s<<'\%'s\n' ; printf -- "${f}" "$2" "$1" ; } ; + close_ml_var() { local f='%s\n' ; printf -- "${f}" "$1" ; } ; + { + var='FFMPEG_DATE' ; + delim="$(mk_delim "${var}")" ; + open_ml_var "${delim}" "${var}" ; + jq_arg='[foreach .[] as $release ([{}, []]; [ .[0] + {($release.commit): ([ $release.date ] + (.[0][($release.commit)] // []) ) }, [ .[1][0] // $release.commit ] ] ; .[0][(.[1][0])] ) ][-1][0]' ; + jq -r "${jq_arg}" -- .ffmpeg.releases.json ; + close_ml_var "${delim}" "${var}" ; + + ffmpeg_date="$( jq -r "${jq_arg}" -- .ffmpeg.releases.json )" + + var='FFMPEG_VERSION' ; + delim="$(mk_delim "${var}")" ; + open_ml_var "${delim}" "${var}" ; + jq_arg='.[]|select(.date == $date)|.versions[]|select(startswith("N-"))' ; + jq -r --arg date "${ffmpeg_date}" "${jq_arg}" -- .ffmpeg.releases.json ; + close_ml_var "${delim}" "${var}" ; + unset -v delim jq_arg var ; + } >> "${GITHUB_OUTPUT}" + cat -v "${GITHUB_OUTPUT}" + rm -v -f .ffmpeg.releases.json + test: + if: ${{ !cancelled() && ( 'pull_request' != github.event_name || (! github.event.pull_request.draft) ) }} runs-on: ubuntu-22.04 strategy: fail-fast: false matrix: - python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', '3.12'] + python-version: ['3.8', '3.9', '3.10', '3.11', '3.12'] steps: - uses: actions/checkout@v4 - name: Install Python ${{ matrix.python-version }} @@ -26,32 +94,72 @@ jobs: run: | python -m pip install --upgrade pip pip install pipenv - pipenv install --system --skip-lock + PIPENV_VERBOSITY=64 pipenv install --system --skip-lock - name: Set up Django environment - run: cp tubesync/tubesync/local_settings.py.example tubesync/tubesync/local_settings.py + run: | + cp -v -p tubesync/tubesync/local_settings.py.example tubesync/tubesync/local_settings.py + cp -v -a -t "${Python3_ROOT_DIR}"/lib/python3.*/site-packages/background_task/ patches/background_task/* + cp -v -a -t "${Python3_ROOT_DIR}"/lib/python3.*/site-packages/yt_dlp/ patches/yt_dlp/* - name: Run Django tests run: cd tubesync && python3 manage.py test --verbosity=2 + containerise: + if: ${{ !cancelled() && 'success' == needs.info.result }} + needs: ['info', 'test'] runs-on: ubuntu-latest + timeout-minutes: 120 steps: - name: Set up QEMU uses: docker/setup-qemu-action@v3 - name: Set up Docker Buildx + id: buildx uses: docker/setup-buildx-action@v3 - name: Log into GitHub Container Registry - run: echo "${{ secrets.REGISTRY_ACCESS_TOKEN }}" | docker login https://ghcr.io -u ${{ github.actor }} --password-stdin - - name: Lowercase github username for ghcr - id: string - uses: ASzc/change-string-case-action@v6 + env: + DOCKER_REGISTRY: https://ghcr.io + DOCKER_USERNAME: ${{ github.actor }} + DOCKER_TOKEN: ${{ 'meeb' == github.repository_owner && secrets.REGISTRY_ACCESS_TOKEN || secrets.GITHUB_TOKEN }} + run: echo "${DOCKER_TOKEN}" | docker login --password-stdin --username "${DOCKER_USERNAME}" "${DOCKER_REGISTRY}" + - name: Build image for `dive` + id: build-dive-image + uses: docker/build-push-action@v6 with: - string: ${{ github.actor }} + build-args: | + IMAGE_NAME=${{ env.IMAGE_NAME }} + FFMPEG_DATE=${{ needs.info.outputs.ffmpeg-date }} + FFMPEG_VERSION=${{ needs.info.outputs.ffmpeg-version }} + YTDLP_DATE=${{ fromJSON(needs.info.outputs.ytdlp-latest-release).tag.name }} + cache-from: type=gha + load: true + platforms: linux/amd64 + push: false + tags: ghcr.io/${{ needs.info.outputs.lowercase-github-actor }}/${{ env.IMAGE_NAME }}:dive + - name: Analysis with `dive` + run: | + docker run --rm \ + -v /var/run/docker.sock:/var/run/docker.sock \ + 'ghcr.io/wagoodman/dive' \ + 'ghcr.io/${{ needs.info.outputs.lowercase-github-actor }}/${{ env.IMAGE_NAME }}:dive' \ + --ci \ + --highestUserWastedPercent '0.03' \ + --highestWastedBytes '10M' - name: Build and push + id: build-push + timeout-minutes: 60 uses: docker/build-push-action@v6 with: platforms: linux/amd64,linux/arm64 - push: true - tags: ghcr.io/${{ steps.string.outputs.lowercase }}/${{ env.IMAGE_NAME }}:latest - cache-from: type=registry,ref=ghcr.io/${{ steps.string.outputs.lowercase }}/${{ env.IMAGE_NAME }}:latest - cache-to: type=inline + push: ${{ 'success' == needs.test.result && 'meeb' == github.repository_owner && 'pull_request' != github.event_name && 'true' || 'false' }} + tags: ghcr.io/${{ needs.info.outputs.lowercase-github-actor }}/${{ env.IMAGE_NAME }}:latest + cache-from: | + type=registry,ref=ghcr.io/${{ needs.info.outputs.lowercase-github-actor }}/${{ env.IMAGE_NAME }}:latest + type=registry,ref=ghcr.io/${{ needs.info.outputs.lowercase-github-repository_owner }}/${{ env.IMAGE_NAME }}:latest + type=gha + cache-to: | + type=gha,mode=max + ${{ 'meeb' == github.repository_owner && 'pull_request' != github.event_name && 'type=inline' || '' }} build-args: | IMAGE_NAME=${{ env.IMAGE_NAME }} + FFMPEG_DATE=${{ needs.info.outputs.ffmpeg-date }} + FFMPEG_VERSION=${{ needs.info.outputs.ffmpeg-version }} + YTDLP_DATE=${{ fromJSON(needs.info.outputs.ytdlp-latest-release).tag.name }} diff --git a/.gitignore b/.gitignore index 17e61eba..c5cd63bc 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,9 @@ __pycache__/ # C extensions *.so +# vim swap files +.*.swp + # Distribution / packaging .Python build/ diff --git a/Dockerfile b/Dockerfile index c54bf9c8..d3169884 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,8 +1,7 @@ # syntax=docker/dockerfile:1 # check=error=true -ARG FFMPEG_DATE="2025-02-18-14-16" -ARG FFMPEG_VERSION="N-118500-g08e37fa082" +ARG FFMPEG_VERSION="N" ARG S6_VERSION="3.2.0.2" @@ -20,6 +19,37 @@ ARG FFMPEG_CHECKSUM_ALGORITHM="sha256" ARG S6_CHECKSUM_ALGORITHM="sha256" +FROM debian:${DEBIAN_VERSION} AS tubesync-base + +ARG TARGETARCH + +ENV DEBIAN_FRONTEND="noninteractive" \ + HOME="/root" \ + LANGUAGE="en_US.UTF-8" \ + LANG="en_US.UTF-8" \ + LC_ALL="en_US.UTF-8" \ + TERM="xterm" \ + # Do not include compiled byte-code + PIP_NO_COMPILE=1 \ + PIP_ROOT_USER_ACTION='ignore' + +RUN --mount=type=cache,id=apt-lib-cache-${TARGETARCH},sharing=private,target=/var/lib/apt \ + --mount=type=cache,id=apt-cache-cache,sharing=private,target=/var/cache/apt \ + # to be careful, ensure that these files aren't from a different architecture + rm -f /var/cache/apt/*cache.bin ; \ + # Update from the network and keep cache + rm -f /etc/apt/apt.conf.d/docker-clean ; \ + set -x && \ + apt-get update && \ + # Install locales + apt-get -y --no-install-recommends install locales && \ + printf -- "en_US.UTF-8 UTF-8\n" > /etc/locale.gen && \ + locale-gen en_US.UTF-8 && \ + # Clean up + apt-get -y autopurge && \ + apt-get -y autoclean && \ + rm -f /var/cache/debconf/*.dat-old + FROM alpine:${ALPINE_VERSION} AS ffmpeg-download ARG FFMPEG_DATE ARG FFMPEG_VERSION @@ -218,52 +248,24 @@ RUN set -eu ; \ FROM scratch AS s6-overlay COPY --from=s6-overlay-extracted /s6-overlay-rootfs / -FROM debian:${DEBIAN_VERSION} AS tubesync +FROM tubesync-base AS tubesync ARG S6_VERSION ARG FFMPEG_DATE ARG FFMPEG_VERSION -ENV DEBIAN_FRONTEND="noninteractive" \ - HOME="/root" \ - LANGUAGE="en_US.UTF-8" \ - LANG="en_US.UTF-8" \ - LC_ALL="en_US.UTF-8" \ - TERM="xterm" \ - # Do not include compiled byte-code - PIP_NO_COMPILE=1 \ - PIP_ROOT_USER_ACTION='ignore' \ - S6_CMD_WAIT_FOR_SERVICES_MAXTIME="0" +ARG TARGETARCH ENV S6_VERSION="${S6_VERSION}" \ FFMPEG_DATE="${FFMPEG_DATE}" \ FFMPEG_VERSION="${FFMPEG_VERSION}" -# Install third party software -COPY --from=s6-overlay / / -COPY --from=ffmpeg /usr/local/bin/ /usr/local/bin/ - # Reminder: the SHELL handles all variables -RUN --mount=type=cache,id=apt-lib-cache,sharing=locked,target=/var/lib/apt \ - --mount=type=cache,id=apt-cache-cache,sharing=locked,target=/var/cache/apt \ +RUN --mount=type=cache,id=apt-lib-cache-${TARGETARCH},sharing=private,target=/var/lib/apt \ + --mount=type=cache,id=apt-cache-cache,sharing=private,target=/var/cache/apt \ set -x && \ - # Update from the network and keep cache - rm -f /etc/apt/apt.conf.d/docker-clean && \ apt-get update && \ - # Install locales - apt-get -y --no-install-recommends install locales && \ - printf -- "en_US.UTF-8 UTF-8\n" > /etc/locale.gen && \ - locale-gen en_US.UTF-8 && \ - # Install file - apt-get -y --no-install-recommends install file && \ - # Installed s6 (using COPY earlier) - file -L /command/s6-overlay-suexec && \ - # Installed ffmpeg (using COPY earlier) - /usr/local/bin/ffmpeg -version && \ - file /usr/local/bin/ff* && \ - # Clean up file - apt-get -y autoremove --purge file && \ # Install dependencies we keep # Install required distro packages apt-get -y --no-install-recommends install \ @@ -275,31 +277,56 @@ RUN --mount=type=cache,id=apt-lib-cache,sharing=locked,target=/var/lib/apt \ pipenv \ pkgconf \ python3 \ + python3-libsass \ + python3-socks \ python3-wheel \ - redis-server \ curl \ less \ && \ + # Link to the current python3 version + ln -v -s -f -T "$(find /usr/local/lib -name 'python3.[0-9]*' -type d -printf '%P\n' | sort -r -V | head -n 1)" /usr/local/lib/python3 && \ + # Create a 'app' user which the application will run as + groupadd app && \ + useradd -M -d /app -s /bin/false -g app app && \ # Clean up apt-get -y autopurge && \ apt-get -y autoclean && \ - rm -rf /tmp/* + rm -v -f /var/cache/debconf/*.dat-old -# Copy over pip.conf to use piwheels -COPY pip.conf /etc/pip.conf +# Install third party software +COPY --from=s6-overlay / / +COPY --from=ffmpeg /usr/local/bin/ /usr/local/bin/ + +RUN --mount=type=cache,id=apt-lib-cache-${TARGETARCH},sharing=private,target=/var/lib/apt \ + --mount=type=cache,id=apt-cache-cache,sharing=private,target=/var/cache/apt \ + set -x && \ + apt-get update && \ + # Install file + apt-get -y --no-install-recommends install file && \ + # Installed s6 (using COPY earlier) + file -L /command/s6-overlay-suexec && \ + # Installed ffmpeg (using COPY earlier) + /usr/local/bin/ffmpeg -version && \ + file /usr/local/bin/ff* && \ + # Clean up file + apt-get -y autoremove --purge file && \ + # Clean up + apt-get -y autopurge && \ + apt-get -y autoclean && \ + rm -v -f /var/cache/debconf/*.dat-old # Switch workdir to the the app WORKDIR /app +ARG YTDLP_DATE + # Set up the app RUN --mount=type=tmpfs,target=/cache \ --mount=type=cache,id=pipenv-cache,sharing=locked,target=/cache/pipenv \ - --mount=type=cache,id=apt-lib-cache,sharing=locked,target=/var/lib/apt \ - --mount=type=cache,id=apt-cache-cache,sharing=locked,target=/var/cache/apt \ + --mount=type=cache,id=apt-lib-cache-${TARGETARCH},sharing=private,target=/var/lib/apt \ + --mount=type=cache,id=apt-cache-cache,sharing=private,target=/var/cache/apt \ --mount=type=bind,source=Pipfile,target=/app/Pipfile \ set -x && \ - # Update from the network and keep cache - rm -f /etc/apt/apt.conf.d/docker-clean && \ apt-get update && \ # Install required build packages apt-get -y --no-install-recommends install \ @@ -315,14 +342,12 @@ RUN --mount=type=tmpfs,target=/cache \ python3-pip \ zlib1g-dev \ && \ - # Create a 'app' user which the application will run as - groupadd app && \ - useradd -M -d /app -s /bin/false -g app app && \ # Install non-distro packages cp -at /tmp/ "${HOME}" && \ HOME="/tmp/${HOME#/}" \ XDG_CACHE_HOME='/cache' \ PIPENV_VERBOSITY=64 \ + PYTHONPYCACHEPREFIX=/cache/pycache \ pipenv install --system --skip-lock && \ # Clean up apt-get -y autoremove --purge \ @@ -340,8 +365,20 @@ RUN --mount=type=tmpfs,target=/cache \ && \ apt-get -y autopurge && \ apt-get -y autoclean && \ + rm -v -f /var/cache/debconf/*.dat-old && \ rm -v -rf /tmp/* +# Copy root +COPY config/root / + +# patch background_task +COPY patches/background_task/ \ + /usr/local/lib/python3/dist-packages/background_task/ + +# patch yt_dlp +COPY patches/yt_dlp/ \ + /usr/local/lib/python3/dist-packages/yt_dlp/ + # Copy app COPY tubesync /app COPY tubesync/tubesync/local_settings.py.container /app/tubesync/local_settings.py @@ -359,30 +396,20 @@ RUN set -x && \ mkdir -v -p /config/cache/pycache && \ mkdir -v -p /downloads/audio && \ mkdir -v -p /downloads/video && \ - # Link to the current python3 version - ln -v -s -f -T "$(find /usr/local/lib -name 'python3.[0-9]*' -type d -printf '%P\n' | sort -r -V | head -n 1)" /usr/local/lib/python3 && \ + # Check nginx configuration copied from config/root/etc + nginx -t && \ # Append software versions ffmpeg_version=$(/usr/local/bin/ffmpeg -version | awk -v 'ev=31' '1 == NR && "ffmpeg" == $1 { print $3; ev=0; } END { exit ev; }') && \ test -n "${ffmpeg_version}" && \ printf -- "ffmpeg_version = '%s'\n" "${ffmpeg_version}" >> /app/common/third_party_versions.py -# Copy root -COPY config/root / - -# patch background_task -COPY patches/background_task/ \ - /usr/local/lib/python3/dist-packages/background_task/ - -# patch yt_dlp -COPY patches/yt_dlp/ \ - /usr/local/lib/python3/dist-packages/yt_dlp/ - # Create a healthcheck HEALTHCHECK --interval=1m --timeout=10s --start-period=3m CMD ["/app/healthcheck.py", "http://127.0.0.1:8080/healthcheck"] # ENVS and ports ENV PYTHONPATH="/app" \ PYTHONPYCACHEPREFIX="/config/cache/pycache" \ + S6_CMD_WAIT_FOR_SERVICES_MAXTIME="0" \ XDG_CACHE_HOME="/config/cache" EXPOSE 4848 diff --git a/Pipfile b/Pipfile index af67a7a1..2976db2e 100644 --- a/Pipfile +++ b/Pipfile @@ -7,20 +7,20 @@ verify_ssl = true autopep8 = "*" [packages] -django = "~=3.2" -django-sass-processor = "*" -libsass = "*" +django = "<5.2" +django-sass-processor = {extras = ["management-command"], version = "*"} pillow = "*" whitenoise = "*" gunicorn = "*" -django-compressor = "*" httptools = "*" -django-background-tasks = "*" +django-background-tasks = ">=1.2.8" django-basicauth = "*" -psycopg2-binary = "*" +psycopg = {extras = ["binary", "pool"], version = "*"} mysqlclient = "*" -yt-dlp = "*" -redis = "*" -hiredis = "*" +PySocks = "*" +urllib3 = {extras = ["socks"], version = "*"} requests = {extras = ["socks"], version = "*"} +yt-dlp = {extras = ["default", "curl-cffi"], version = "*"} emoji = "*" +brotli = "*" +html5lib = "*" diff --git a/README.md b/README.md index af3cd910..17367a4a 100644 --- a/README.md +++ b/README.md @@ -70,7 +70,7 @@ currently just Plex, to complete the PVR experience. TubeSync is designed to be run in a container, such as via Docker or Podman. It also works in a Docker Compose stack. `amd64` (most desktop PCs and servers) and `arm64` -(modern ARM computers, such as the Rasperry Pi 3 or later) are supported. +(modern ARM computers, such as the Raspberry Pi 3 or later) are supported. Example (with Docker on *nix): @@ -356,7 +356,7 @@ etc.). Configuration of this is beyond the scope of this README. Only two are supported, for the moment: - `amd64` (most desktop PCs and servers) - `arm64` -(modern ARM computers, such as the Rasperry Pi 3 or later) +(modern ARM computers, such as the Raspberry Pi 3 or later) Others may be made available, if there is demand. diff --git a/config/root/etc/nginx/nginx.conf b/config/root/etc/nginx/nginx.conf index f09c02e1..e6b99b68 100644 --- a/config/root/etc/nginx/nginx.conf +++ b/config/root/etc/nginx/nginx.conf @@ -6,49 +6,89 @@ worker_cpu_affinity auto; pid /run/nginx.pid; events { - worker_connections 1024; + worker_connections 1024; } http { - # Basic settings - sendfile on; - tcp_nopush on; - tcp_nodelay on; - keepalive_timeout 300; - types_hash_max_size 2048; - server_tokens off; - server_names_hash_bucket_size 64; - server_name_in_redirect off; - client_body_in_file_only clean; - client_body_buffer_size 32K; - client_max_body_size 100M; - send_timeout 300s; - large_client_header_buffers 4 8k; + # Basic settings + sendfile on; + tcp_nopush on; + tcp_nodelay on; + keepalive_timeout 300; + types_hash_max_size 2048; + server_tokens off; + server_names_hash_bucket_size 64; + server_name_in_redirect off; + client_body_in_file_only clean; + client_body_buffer_size 32K; + client_max_body_size 100M; + send_timeout 300s; + large_client_header_buffers 4 8k; - # Mime type handling - include /etc/nginx/mime.types; - default_type application/octet-stream; + # Mime type handling + include /etc/nginx/mime.types; + default_type application/octet-stream; - # Default security headers - add_header X-Frame-Options SAMEORIGIN; - add_header X-Content-Type-Options nosniff; - add_header X-XSS-Protection "1; mode=block"; + # Default security headers + add_header X-Frame-Options SAMEORIGIN; + add_header X-Content-Type-Options nosniff; + add_header X-XSS-Protection "1; mode=block"; - # Logging - log_format host '$remote_addr - $remote_user [$time_local] "[$host] $request" $status $bytes_sent "$http_referer" "$http_user_agent" "$gzip_ratio"'; - access_log /dev/stdout; - error_log stderr; + # Logging + log_format host '$remote_addr - $remote_user [$time_local] "[$host] $request" $status $bytes_sent "$http_referer" "$http_user_agent" "$gzip_ratio"'; + access_log /dev/stdout; + error_log stderr; - # GZIP - gzip on; - gzip_disable "msie6"; - gzip_vary on; - gzip_proxied any; - gzip_comp_level 6; - gzip_buffers 16 8k; - gzip_http_version 1.1; - gzip_types text/plain text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript; + # GZIP + gzip on; + gzip_disable "msie6"; + gzip_vary on; + gzip_proxied any; + gzip_comp_level 6; + gzip_buffers 16 8k; + gzip_http_version 1.1; + gzip_types text/plain text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript; + + # Caching proxy requests + proxy_cache_lock on; + proxy_cache_use_stale updating; + # temporary files in shared memory + proxy_temp_path /dev/shm/nginx-tmp 1; + # change this to /config/cache/nginx for a persistent cache + proxy_cache_path /dev/shm/nginx-cache levels=1:2:2 keys_zone=gunicorn:4m inactive=48h max_size=256m min_free=16m; + + # X-Forwarded-Host (pass-through, or set) + map $http_x_forwarded_host $x_forwarded_host { + default $http_x_forwarded_host; + "" $http_host; + } + + # X-Forwarded-Proto (pass-through, or set) + map $http_x_forwarded_proto $x_forwarded_proto { + default $http_x_forwarded_proto; + "" $scheme; + } + + # Set the default port based on X-Forwarded-Proto + map $x_forwarded_proto $default_http_port { + default 80; + "https" 443; + } + + # Extract the remote port from the HTTP Host header. + # Uses default_http_port from above, + # when no port was found in the header. + map $http_host $x_remote_port { + default $default_http_port; + "~^[^\:]+:(?
\d+)$" $p; + } + + # X-Forwarded-Port (pass-through, or set) + map $http_x_forwarded_port $x_forwarded_port { + default $http_x_forwarded_port; + "" $x_remote_port; + } # Site server { @@ -71,13 +111,21 @@ http { # Authentication and proxying location / { proxy_pass http://127.0.0.1:8080; - proxy_set_header Host localhost; - proxy_set_header X-Forwarded-Proto $scheme; + proxy_set_header Host localhost:$proxy_port; + proxy_set_header X-Forwarded-Host $x_forwarded_host; + proxy_set_header X-Forwarded-Port $x_forwarded_port; + proxy_set_header X-Forwarded-Proto $x_forwarded_proto; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Real-IP $remote_addr; proxy_redirect off; - proxy_read_timeout 59; - proxy_connect_timeout 10; + + # this read timeout should be lower than gunicorn's timeout + proxy_read_timeout 89s; + proxy_connect_timeout 10s; + + # cache long running web requests + proxy_cache gunicorn; + proxy_cache_lock_timeout 88s; } # File dwnload and streaming diff --git a/config/root/etc/redis/redis.conf b/config/root/etc/redis/redis.conf deleted file mode 100644 index 8e411e80..00000000 --- a/config/root/etc/redis/redis.conf +++ /dev/null @@ -1,46 +0,0 @@ -bind 127.0.0.1 -protected-mode yes -port 6379 -tcp-backlog 511 -timeout 0 -tcp-keepalive 300 -daemonize no -supervised no -loglevel notice -logfile "" -databases 1 -always-show-logo no -save "" -dir /var/lib/redis -maxmemory 64mb -maxmemory-policy noeviction -lazyfree-lazy-eviction no -lazyfree-lazy-expire no -lazyfree-lazy-server-del no -replica-lazy-flush no -lazyfree-lazy-user-del no -oom-score-adj no -oom-score-adj-values 0 200 800 -appendonly no -appendfsync no -lua-time-limit 5000 -slowlog-log-slower-than 10000 -slowlog-max-len 128 -latency-monitor-threshold 0 -notify-keyspace-events "" -hash-max-ziplist-entries 512 -hash-max-ziplist-value 64 -list-max-ziplist-size -2 -list-compress-depth 0 -set-max-intset-entries 512 -zset-max-ziplist-entries 128 -zset-max-ziplist-value 64 -hll-sparse-max-bytes 3000 -stream-node-max-bytes 4096 -stream-node-max-entries 100 -activerehashing yes -client-output-buffer-limit normal 0 0 0 -client-output-buffer-limit replica 256mb 64mb 60 -client-output-buffer-limit pubsub 32mb 8mb 60 -hz 10 -dynamic-hz yes diff --git a/config/root/etc/s6-overlay/s6-rc.d/celery-beat/run b/config/root/etc/s6-overlay/s6-rc.d/celery-beat/run deleted file mode 100755 index 46b03a67..00000000 --- a/config/root/etc/s6-overlay/s6-rc.d/celery-beat/run +++ /dev/null @@ -1,25 +0,0 @@ -#!/usr/bin/with-contenv bash - -UMASK_SET=${UMASK_SET:-022} -umask "$UMASK_SET" - -cd /app || exit - -PIDFILE=/run/app/celery-beat.pid -SCHEDULE=/tmp/tubesync-celerybeat-schedule - -if [ -f "${PIDFILE}" ] -then - PID=$(cat $PIDFILE) - echo "Unexpected PID file exists at ${PIDFILE} with PID: ${PID}" - if kill -0 $PID - then - echo "Killing old gunicorn process with PID: ${PID}" - kill -9 $PID - fi - echo "Removing stale PID file: ${PIDFILE}" - rm ${PIDFILE} -fi - -#exec s6-setuidgid app \ -# /usr/local/bin/celery --workdir /app -A tubesync beat --pidfile ${PIDFILE} -s ${SCHEDULE} diff --git a/config/root/etc/s6-overlay/s6-rc.d/celery-worker/run b/config/root/etc/s6-overlay/s6-rc.d/celery-worker/run deleted file mode 100755 index 04e2a32c..00000000 --- a/config/root/etc/s6-overlay/s6-rc.d/celery-worker/run +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/with-contenv bash - -UMASK_SET=${UMASK_SET:-022} -umask "$UMASK_SET" - -cd /app || exit - -PIDFILE=/run/app/celery-worker.pid - -if [ -f "${PIDFILE}" ] -then - PID=$(cat $PIDFILE) - echo "Unexpected PID file exists at ${PIDFILE} with PID: ${PID}" - if kill -0 $PID - then - echo "Killing old gunicorn process with PID: ${PID}" - kill -9 $PID - fi - echo "Removing stale PID file: ${PIDFILE}" - rm ${PIDFILE} -fi - -#exec s6-setuidgid app \ -# /usr/local/bin/celery --workdir /app -A tubesync worker --pidfile ${PIDFILE} -l INFO diff --git a/config/root/etc/s6-overlay/s6-rc.d/redis/run b/config/root/etc/s6-overlay/s6-rc.d/redis/run deleted file mode 100755 index 09edec81..00000000 --- a/config/root/etc/s6-overlay/s6-rc.d/redis/run +++ /dev/null @@ -1,4 +0,0 @@ -#!/command/with-contenv bash - -exec s6-setuidgid redis \ - /usr/bin/redis-server /etc/redis/redis.conf diff --git a/config/root/etc/s6-overlay/s6-rc.d/celery-beat/dependencies b/config/root/etc/s6-overlay/s6-rc.d/tubesync-db-worker/dependencies similarity index 100% rename from config/root/etc/s6-overlay/s6-rc.d/celery-beat/dependencies rename to config/root/etc/s6-overlay/s6-rc.d/tubesync-db-worker/dependencies diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-db-worker/down-signal b/config/root/etc/s6-overlay/s6-rc.d/tubesync-db-worker/down-signal new file mode 100644 index 00000000..d751378e --- /dev/null +++ b/config/root/etc/s6-overlay/s6-rc.d/tubesync-db-worker/down-signal @@ -0,0 +1 @@ +SIGINT diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-db-worker/run b/config/root/etc/s6-overlay/s6-rc.d/tubesync-db-worker/run new file mode 100755 index 00000000..03b75ea8 --- /dev/null +++ b/config/root/etc/s6-overlay/s6-rc.d/tubesync-db-worker/run @@ -0,0 +1,5 @@ +#!/command/with-contenv bash + +exec nice -n "${TUBESYNC_NICE:-1}" s6-setuidgid app \ + /usr/bin/python3 /app/manage.py process_tasks \ + --queue database diff --git a/config/root/etc/s6-overlay/s6-rc.d/celery-beat/type b/config/root/etc/s6-overlay/s6-rc.d/tubesync-db-worker/type similarity index 100% rename from config/root/etc/s6-overlay/s6-rc.d/celery-beat/type rename to config/root/etc/s6-overlay/s6-rc.d/tubesync-db-worker/type diff --git a/config/root/etc/s6-overlay/s6-rc.d/celery-worker/dependencies b/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/dependencies similarity index 100% rename from config/root/etc/s6-overlay/s6-rc.d/celery-worker/dependencies rename to config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/dependencies diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/down-signal b/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/down-signal new file mode 100644 index 00000000..d751378e --- /dev/null +++ b/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/down-signal @@ -0,0 +1 @@ +SIGINT diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/run b/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/run new file mode 100755 index 00000000..0642054d --- /dev/null +++ b/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/run @@ -0,0 +1,5 @@ +#!/command/with-contenv bash + +exec nice -n "${TUBESYNC_NICE:-1}" s6-setuidgid app \ + /usr/bin/python3 /app/manage.py process_tasks \ + --queue filesystem diff --git a/config/root/etc/s6-overlay/s6-rc.d/celery-worker/type b/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/type similarity index 100% rename from config/root/etc/s6-overlay/s6-rc.d/celery-worker/type rename to config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/type diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-init/run b/config/root/etc/s6-overlay/s6-rc.d/tubesync-init/run index 4ac5ff8e..ff0d4d55 100755 --- a/config/root/etc/s6-overlay/s6-rc.d/tubesync-init/run +++ b/config/root/etc/s6-overlay/s6-rc.d/tubesync-init/run @@ -1,10 +1,8 @@ #!/command/with-contenv bash # Change runtime user UID and GID -PUID="${PUID:-911}" -PUID="${PUID:-911}" -groupmod -o -g "$PGID" app -usermod -o -u "$PUID" app +groupmod -o -g "${PGID:=911}" app +usermod -o -u "${PUID:=911}" app # Reset permissions chown -R app:app /run/app @@ -13,12 +11,10 @@ chown -R app:app /config chmod -R 0755 /config chown -R root:app /app chmod -R 0750 /app +chmod 0755 /app/*.py /app/*.sh +find /app -mindepth 2 -type f -execdir chmod 640 '{}' + chown -R app:app /app/common/static -chmod -R 0750 /app/common/static chown -R app:app /app/static -chmod -R 0750 /app/static -find /app -type f ! -iname healthcheck.py -exec chmod 640 {} \; -chmod 0755 /app/healthcheck.py # Optionally reset the download dir permissions if [ "${TUBESYNC_RESET_DOWNLOAD_DIR:=True}" == "True" ] diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-worker/dependencies b/config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/dependencies similarity index 100% rename from config/root/etc/s6-overlay/s6-rc.d/tubesync-worker/dependencies rename to config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/dependencies diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/down-signal b/config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/down-signal new file mode 100644 index 00000000..d751378e --- /dev/null +++ b/config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/down-signal @@ -0,0 +1 @@ +SIGINT diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-worker/run b/config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/run similarity index 53% rename from config/root/etc/s6-overlay/s6-rc.d/tubesync-worker/run rename to config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/run index b2c3a841..a9c17d49 100755 --- a/config/root/etc/s6-overlay/s6-rc.d/tubesync-worker/run +++ b/config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/run @@ -1,4 +1,5 @@ #!/command/with-contenv bash exec nice -n "${TUBESYNC_NICE:-1}" s6-setuidgid app \ - /usr/bin/python3 /app/manage.py process_tasks + /usr/bin/python3 /app/manage.py process_tasks \ + --queue network diff --git a/config/root/etc/s6-overlay/s6-rc.d/redis/type b/config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/type similarity index 100% rename from config/root/etc/s6-overlay/s6-rc.d/redis/type rename to config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/type diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-worker/type b/config/root/etc/s6-overlay/s6-rc.d/tubesync-worker/type deleted file mode 100644 index 1780f9f4..00000000 --- a/config/root/etc/s6-overlay/s6-rc.d/tubesync-worker/type +++ /dev/null @@ -1 +0,0 @@ -longrun \ No newline at end of file diff --git a/config/root/etc/s6-overlay/s6-rc.d/redis/dependencies b/config/root/etc/s6-overlay/s6-rc.d/user/contents.d/tubesync-db-worker similarity index 100% rename from config/root/etc/s6-overlay/s6-rc.d/redis/dependencies rename to config/root/etc/s6-overlay/s6-rc.d/user/contents.d/tubesync-db-worker diff --git a/config/root/etc/s6-overlay/s6-rc.d/user/contents.d/tubesync-worker b/config/root/etc/s6-overlay/s6-rc.d/user/contents.d/tubesync-fs-worker similarity index 100% rename from config/root/etc/s6-overlay/s6-rc.d/user/contents.d/tubesync-worker rename to config/root/etc/s6-overlay/s6-rc.d/user/contents.d/tubesync-fs-worker diff --git a/config/root/etc/s6-overlay/s6-rc.d/user/contents.d/tubesync-network-worker b/config/root/etc/s6-overlay/s6-rc.d/user/contents.d/tubesync-network-worker new file mode 100644 index 00000000..e69de29b diff --git a/patches/background_task/management/commands/process_tasks.py b/patches/background_task/management/commands/process_tasks.py new file mode 100644 index 00000000..9484c393 --- /dev/null +++ b/patches/background_task/management/commands/process_tasks.py @@ -0,0 +1,124 @@ +# -*- coding: utf-8 -*- +import logging +import random +import sys +import time + +from django import VERSION +from django.core.management.base import BaseCommand +from django.utils import autoreload + +from background_task.tasks import tasks, autodiscover +from background_task.utils import SignalManager +from django.db import close_old_connections as close_connection + + +logger = logging.getLogger(__name__) + + +def _configure_log_std(): + class StdOutWrapper(object): + def write(self, s): + logger.info(s) + + class StdErrWrapper(object): + def write(self, s): + logger.error(s) + sys.stdout = StdOutWrapper() + sys.stderr = StdErrWrapper() + + +class Command(BaseCommand): + help = 'Run tasks that are scheduled to run on the queue' + + # Command options are specified in an abstract way to enable Django < 1.8 compatibility + OPTIONS = ( + (('--duration', ), { + 'action': 'store', + 'dest': 'duration', + 'type': int, + 'default': 0, + 'help': 'Run task for this many seconds (0 or less to run forever) - default is 0', + }), + (('--sleep', ), { + 'action': 'store', + 'dest': 'sleep', + 'type': float, + 'default': 5.0, + 'help': 'Sleep for this many seconds before checking for new tasks (if none were found) - default is 5', + }), + (('--queue', ), { + 'action': 'store', + 'dest': 'queue', + 'help': 'Only process tasks on this named queue', + }), + (('--log-std', ), { + 'action': 'store_true', + 'dest': 'log_std', + 'help': 'Redirect stdout and stderr to the logging system', + }), + (('--dev', ), { + 'action': 'store_true', + 'dest': 'dev', + 'help': 'Auto-reload your code on changes. Use this only for development', + }), + ) + + if VERSION < (1, 8): + from optparse import make_option + option_list = BaseCommand.option_list + tuple([make_option(*args, **kwargs) for args, kwargs in OPTIONS]) + + # Used in Django >= 1.8 + def add_arguments(self, parser): + for (args, kwargs) in self.OPTIONS: + parser.add_argument(*args, **kwargs) + + def __init__(self, *args, **kwargs): + super(Command, self).__init__(*args, **kwargs) + self.sig_manager = None + self._tasks = tasks + + def run(self, *args, **options): + duration = options.get('duration', 0) + sleep = options.get('sleep', 5.0) + queue = options.get('queue', None) + log_std = options.get('log_std', False) + is_dev = options.get('dev', False) + sig_manager = self.sig_manager + + if is_dev: + # raise last Exception is exist + autoreload.raise_last_exception() + + if log_std: + _configure_log_std() + + autodiscover() + + start_time = time.time() + + while (duration <= 0) or (time.time() - start_time) <= duration: + if sig_manager.kill_now: + # shutting down gracefully + break + + if not self._tasks.run_next_task(queue): + # there were no tasks in the queue, let's recover. + close_connection() + logger.debug('waiting for tasks') + time.sleep(sleep) + else: + # there were some tasks to process, let's check if there is more work to do after a little break. + time.sleep(random.uniform(sig_manager.time_to_wait[0], sig_manager.time_to_wait[1])) + self._tasks._pool_runner._pool.close() + + def handle(self, *args, **options): + is_dev = options.get('dev', False) + self.sig_manager = SignalManager() + if is_dev: + reload_func = autoreload.run_with_reloader + if VERSION < (2, 2): + reload_func = autoreload.main + reload_func(self.run, *args, **options) + else: + self.run(*args, **options) diff --git a/patches/background_task/models.py b/patches/background_task/models.py index 02f7164a..7214308d 100644 --- a/patches/background_task/models.py +++ b/patches/background_task/models.py @@ -1,13 +1,14 @@ # -*- coding: utf-8 -*- -from datetime import timedelta +from datetime import datetime, timedelta, timezone as tz from hashlib import sha1 +from pathlib import Path import json import logging import os import traceback -from compat import StringIO -from compat.models import GenericForeignKey +from io import StringIO +from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.contenttypes.models import ContentType from django.db import models from django.db.models import Q @@ -38,6 +39,23 @@ class TaskQuerySet(models.QuerySet): class TaskManager(models.Manager): + _boot_time = posix_epoch = datetime(1970, 1, 1, tzinfo=tz.utc) + + @property + def boot_time(self): + if self._boot_time > self.posix_epoch: + return self._boot_time + stats = None + boot_time = self.posix_epoch + kcore_path = Path('/proc/kcore') + if kcore_path.exists(): + stats = kcore_path.stat() + if stats: + boot_time += timedelta(seconds=stats.st_mtime) + if boot_time > self._boot_time: + self._boot_time = boot_time + return self._boot_time + def get_queryset(self): return TaskQuerySet(self.model, using=self._db) @@ -50,14 +68,15 @@ class TaskManager(models.Manager): if queue: qs = qs.filter(queue=queue) ready = qs.filter(run_at__lte=now, failed_at=None) - _priority_ordering = '{}priority'.format(app_settings.BACKGROUND_TASK_PRIORITY_ORDERING) + _priority_ordering = '{}priority'.format( + app_settings.BACKGROUND_TASK_PRIORITY_ORDERING) ready = ready.order_by(_priority_ordering, 'run_at') if app_settings.BACKGROUND_TASK_RUN_ASYNC: currently_failed = self.failed().count() currently_locked = self.locked(now).count() count = app_settings.BACKGROUND_TASK_ASYNC_THREADS - \ - (currently_locked - currently_failed) + (currently_locked - currently_failed) if count > 0: ready = ready[:count] else: @@ -68,14 +87,14 @@ class TaskManager(models.Manager): max_run_time = app_settings.BACKGROUND_TASK_MAX_RUN_TIME qs = self.get_queryset() expires_at = now - timedelta(seconds=max_run_time) - unlocked = Q(locked_by=None) | Q(locked_at__lt=expires_at) + unlocked = Q(locked_by=None) | Q(locked_at__lt=expires_at) | Q(locked_at__lt=self.boot_time) return qs.filter(unlocked) def locked(self, now): max_run_time = app_settings.BACKGROUND_TASK_MAX_RUN_TIME qs = self.get_queryset() expires_at = now - timedelta(seconds=max_run_time) - locked = Q(locked_by__isnull=False) & Q(locked_at__gt=expires_at) + locked = Q(locked_by__isnull=False) & Q(locked_at__gt=expires_at) & Q(locked_at__gt=self.boot_time) return qs.filter(locked) def failed(self): @@ -102,7 +121,8 @@ class TaskManager(models.Manager): s = "%s%s" % (task_name, task_params) task_hash = sha1(s.encode('utf-8')).hexdigest() if remove_existing_tasks: - Task.objects.filter(task_hash=task_hash, locked_at__isnull=True).delete() + Task.objects.filter(task_hash=task_hash, + locked_at__isnull=True).delete() return Task(task_name=task_name, task_params=task_params, task_hash=task_hash, @@ -188,14 +208,23 @@ class Task(models.Model): objects = TaskManager() + @property + def nodename(self): + return os.uname().nodename[:(64-10)] + def locked_by_pid_running(self): """ Check if the locked_by process is still running. """ - if self.locked_by: + if self in self.__class__.objects.locked(timezone.now()) and self.locked_by: + pid, nodename = self.locked_by.split('/', 1) + # locked by a process on this node? + if nodename != self.nodename: + return False + # is the process still running? try: - # won't kill the process. kill is a bad named system call - os.kill(int(self.locked_by), 0) + # Signal number zero won't kill the process. + os.kill(int(pid), 0) return True except: return False @@ -218,8 +247,9 @@ class Task(models.Model): def lock(self, locked_by): now = timezone.now() + owner = f'{locked_by[:8]}/{self.nodename}' unlocked = Task.objects.unlocked(now).filter(pk=self.pk) - updated = unlocked.update(locked_by=locked_by, locked_at=now) + updated = unlocked.update(locked_by=owner, locked_at=now) if updated: return Task.objects.get(pk=self.pk) return None @@ -251,13 +281,14 @@ class Task(models.Model): self.failed_at = timezone.now() logger.warning('Marking task %s as failed', self) completed = self.create_completed_task() - task_failed.send(sender=self.__class__, task_id=self.id, completed_task=completed) + task_failed.send(sender=self.__class__, + task_id=self.id, completed_task=completed) self.delete() else: backoff = timedelta(seconds=(self.attempts ** 4) + 5) self.run_at = timezone.now() + backoff logger.warning('Rescheduling task %s for %s later at %s', self, - backoff, self.run_at) + backoff, self.run_at) task_rescheduled.send(sender=self.__class__, task=self) self.locked_by = None self.locked_at = None @@ -330,9 +361,6 @@ class Task(models.Model): db_table = 'background_task' - - - class CompletedTaskQuerySet(models.QuerySet): def created_by(self, creator): @@ -389,7 +417,8 @@ class CompletedTask(models.Model): # when the task should be run run_at = models.DateTimeField(db_index=True) - repeat = models.BigIntegerField(choices=Task.REPEAT_CHOICES, default=Task.NEVER) + repeat = models.BigIntegerField( + choices=Task.REPEAT_CHOICES, default=Task.NEVER) repeat_until = models.DateTimeField(null=True, blank=True) # the "name" of the queue this is to be run on @@ -422,9 +451,14 @@ class CompletedTask(models.Model): Check if the locked_by process is still running. """ if self.locked_by: + pid, node = self.locked_by.split('/', 1) + # locked by a process on this node? + if os.uname().nodename[:(64-10)] != node: + return False + # is the process still running? try: # won't kill the process. kill is a bad named system call - os.kill(int(self.locked_by), 0) + os.kill(int(pid), 0) return True except: return False diff --git a/patches/background_task/utils.py b/patches/background_task/utils.py new file mode 100644 index 00000000..9b607abd --- /dev/null +++ b/patches/background_task/utils.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- +import signal +import platform + +TTW_SLOW = [0.5, 1.5] +TTW_FAST = [0.0, 0.1] + + +class SignalManager(): + """Manages POSIX signals.""" + + kill_now = False + time_to_wait = TTW_SLOW + + def __init__(self): + signal.signal(signal.SIGINT, self.exit_gracefully) + # On Windows, signal() can only be called with: + # SIGABRT, SIGFPE, SIGILL, SIGINT, SIGSEGV, SIGTERM, or SIGBREAK. + if platform.system() == 'Windows': + signal.signal(signal.SIGBREAK, self.exit_gracefully) + else: + signal.signal(signal.SIGHUP, self.exit_gracefully) + signal.signal(signal.SIGUSR1, self.speed_up) + signal.signal(signal.SIGUSR2, self.slow_down) + + def exit_gracefully(self, signum, frame): + self.kill_now = True + # Using interrupt again should raise + # a KeyboardInterrupt exception. + signal.signal(signal.SIGINT, signal.SIG_DFL) + + def speed_up(self, signum, frame): + self.time_to_wait = TTW_FAST + + def slow_down(self, signum, frame): + self.time_to_wait = TTW_SLOW diff --git a/patches/yt_dlp/patch/__init__.py b/patches/yt_dlp/patch/__init__.py new file mode 100644 index 00000000..f2d40a97 --- /dev/null +++ b/patches/yt_dlp/patch/__init__.py @@ -0,0 +1,5 @@ +from yt_dlp.compat.compat_utils import passthrough_module + +passthrough_module(__name__, '.patch') +del passthrough_module + diff --git a/patches/yt_dlp/patch/check_thumbnails.py b/patches/yt_dlp/patch/check_thumbnails.py new file mode 100644 index 00000000..25723bb6 --- /dev/null +++ b/patches/yt_dlp/patch/check_thumbnails.py @@ -0,0 +1,43 @@ +from yt_dlp import YoutubeDL +from yt_dlp.utils import sanitize_url, LazyList + +class PatchedYoutubeDL(YoutubeDL): + + def _sanitize_thumbnails(self, info_dict): + thumbnails = info_dict.get('thumbnails') + if thumbnails is None: + thumbnail = info_dict.get('thumbnail') + if thumbnail: + info_dict['thumbnails'] = thumbnails = [{'url': thumbnail}] + if not thumbnails: + return + + + def check_thumbnails(thumbnails): + for t in thumbnails: + self.to_screen(f'[info] Testing thumbnail {t["id"]}: {t["url"]!r}') + try: + self.urlopen(HEADRequest(t['url'])) + except network_exceptions as err: + self.to_screen(f'[info] Unable to connect to thumbnail {t["id"]} URL {t["url"]!r} - {err}. Skipping...') + continue + yield t + + + self._sort_thumbnails(thumbnails) + for i, t in enumerate(thumbnails): + if t.get('id') is None: + t['id'] = str(i) + if t.get('width') and t.get('height'): + t['resolution'] = '%dx%d' % (t['width'], t['height']) + t['url'] = sanitize_url(t['url']) + + + if self.params.get('check_thumbnails') is True: + info_dict['thumbnails'] = LazyList(check_thumbnails(thumbnails[::-1]), reverse=True) + else: + info_dict['thumbnails'] = thumbnails + + +YoutubeDL.__unpatched___sanitize_thumbnails = YoutubeDL._sanitize_thumbnails +YoutubeDL._sanitize_thumbnails = PatchedYoutubeDL._sanitize_thumbnails diff --git a/patches/yt_dlp/patch/fatal_http_errors.py b/patches/yt_dlp/patch/fatal_http_errors.py new file mode 100644 index 00000000..442db436 --- /dev/null +++ b/patches/yt_dlp/patch/fatal_http_errors.py @@ -0,0 +1,25 @@ +from yt_dlp.extractor.youtube import YoutubeIE + + +class PatchedYoutubeIE(YoutubeIE): + + def _download_player_responses(self, url, smuggled_data, video_id, webpage_url): + webpage = None + if 'webpage' not in self._configuration_arg('player_skip'): + query = {'bpctr': '9999999999', 'has_verified': '1'} + pp = self._configuration_arg('player_params', [None], casesense=True)[0] + if pp: + query['pp'] = pp + webpage = self._download_webpage_with_retries(webpage_url, video_id, retry_fatal=True, query=query) + + master_ytcfg = self.extract_ytcfg(video_id, webpage) or self._get_default_ytcfg() + + player_responses, player_url = self._extract_player_responses( + self._get_requested_clients(url, smuggled_data), + video_id, webpage, master_ytcfg, smuggled_data) + + return webpage, master_ytcfg, player_responses, player_url + + +YoutubeIE.__unpatched___download_player_responses = YoutubeIE._download_player_responses +YoutubeIE._download_player_responses = PatchedYoutubeIE._download_player_responses diff --git a/pip.conf b/pip.conf deleted file mode 100644 index e92bae15..00000000 --- a/pip.conf +++ /dev/null @@ -1,2 +0,0 @@ -[global] -extra-index-url=https://www.piwheels.org/simple diff --git a/tubesync/common/errors.py b/tubesync/common/errors.py index 130510a7..87d8aa4d 100644 --- a/tubesync/common/errors.py +++ b/tubesync/common/errors.py @@ -14,6 +14,14 @@ class NoFormatException(Exception): pass +class NoMetadataException(Exception): + ''' + Raised when a media item is attempted to be downloaded but it has no valid + metadata. + ''' + pass + + class DownloadFailedException(Exception): ''' Raised when a downloaded media file is expected to be present, but doesn't diff --git a/tubesync/common/logger.py b/tubesync/common/logger.py index a1fcf89a..0bdd52fd 100644 --- a/tubesync/common/logger.py +++ b/tubesync/common/logger.py @@ -1,14 +1,37 @@ import logging from django.conf import settings +from .utils import getenv logging_level = logging.DEBUG if settings.DEBUG else logging.INFO +default_formatter = logging.Formatter( + '%(asctime)s [%(name)s/%(levelname)s] %(message)s' +) +default_sh = logging.StreamHandler() +default_sh.setFormatter(default_formatter) +default_sh.setLevel(logging_level) -log = logging.getLogger('tubesync') -log.setLevel(logging_level) -ch = logging.StreamHandler() -ch.setLevel(logging_level) -formatter = logging.Formatter('%(asctime)s [%(name)s/%(levelname)s] %(message)s') -ch.setFormatter(formatter) -log.addHandler(ch) +app_name = getenv('DJANGO_SETTINGS_MODULE') +first_part = app_name.split('.', 1)[0] +log = app_logger = logging.getLogger(first_part) +app_logger.addHandler(default_sh) +app_logger.setLevel(logging_level) + + +class NoWaitingForTasksFilter(logging.Filter): + def filter(self, record): + return 'waiting for tasks' != record.getMessage() + +background_task_name = 'background_task.management.commands.process_tasks' +last_part = background_task_name.rsplit('.', 1)[-1] +background_task_formatter = logging.Formatter( + f'%(asctime)s [{last_part}/%(levelname)s] %(message)s' +) +background_task_sh = logging.StreamHandler() +background_task_sh.addFilter(NoWaitingForTasksFilter()) +background_task_sh.setFormatter(background_task_formatter) +background_task_sh.setLevel(logging_level) +background_task_logger = logging.getLogger(background_task_name) +background_task_logger.addHandler(background_task_sh) +background_task_logger.setLevel(logging_level) diff --git a/tubesync/common/static/styles/tubesync.scss b/tubesync/common/static/styles/tubesync.scss index 30a41fd8..1012ecfc 100644 --- a/tubesync/common/static/styles/tubesync.scss +++ b/tubesync/common/static/styles/tubesync.scss @@ -29,4 +29,9 @@ html { .help-text > i { padding-right: 6px; -} \ No newline at end of file +} + +.issue-641 { + display: block !important; + overflow-wrap: anywhere; +} diff --git a/tubesync/common/utils.py b/tubesync/common/utils.py index 95efd9f3..5894f0fc 100644 --- a/tubesync/common/utils.py +++ b/tubesync/common/utils.py @@ -1,11 +1,51 @@ +import cProfile +import emoji +import io +import os +import pstats import string +import time from datetime import datetime from urllib.parse import urlunsplit, urlencode, urlparse -import emoji from yt_dlp.utils import LazyList from .errors import DatabaseConnectionError +def getenv(key, default=None, /, *, integer=False, string=True): + ''' + Guarantees a returned type from calling `os.getenv` + The caller can request the integer type, + or use the default string type. + ''' + + args = dict(key=key, default=default, integer=integer, string=string) + supported_types = dict(zip(args.keys(), ( + (str,), # key + ( + bool, + float, + int, + str, + None.__class__, + ), # default + (bool,) * (len(args.keys()) - 2), + ))) + unsupported_type_msg = 'Unsupported type for positional argument, "{}": {}' + for k, t in supported_types.items(): + v = args[k] + assert isinstance(v, t), unsupported_type_msg.format(k, type(v)) + + d = str(default) if default is not None else None + + r = os.getenv(key, d) + if r is None: + if string: r = str() + if integer: r = int() + elif integer: + r = int(float(r)) + return r + + def parse_database_connection_string(database_connection_string): ''' Parses a connection string in a URL style format, such as: @@ -136,3 +176,49 @@ def json_serial(obj): if isinstance(obj, LazyList): return list(obj) raise TypeError(f'Type {type(obj)} is not json_serial()-able') + + +def time_func(func): + def wrapper(*args, **kwargs): + start = time.perf_counter() + result = func(*args, **kwargs) + end = time.perf_counter() + return (result, (end - start, start, end,),) + return wrapper + + +def profile_func(func): + def wrapper(*args, **kwargs): + s = io.StringIO() + with cProfile.Profile() as pr: + pr.enable() + result = func(*args, **kwargs) + pr.disable() + ps = pstats.Stats(pr, stream=s) + ps.sort_stats( + pstats.SortKey.CUMULATIVE + ).print_stats() + return (result, (s.getvalue(), ps, s,),) + return wrapper + + +def remove_enclosed(haystack, /, open='[', close=']', sep=' ', *, valid=None, start=None, end=None): + if not haystack: + return haystack + assert open and close, 'open and close are required to be non-empty strings' + o = haystack.find(open, start, end) + sep = sep or '' + n = close + sep + c = haystack.find(n, len(open)+o, end) + if -1 in {o, c}: + return haystack + if valid is not None: + content = haystack[len(open)+o:c] + found = set(content) + valid = set(valid) + invalid = found - valid + # assert not invalid, f'Invalid characters {invalid} found in: {content}' + if invalid: + return haystack + return haystack[:o] + haystack[len(n)+c:] + diff --git a/tubesync/full_playlist.sh b/tubesync/full_playlist.sh new file mode 100755 index 00000000..08bab14b --- /dev/null +++ b/tubesync/full_playlist.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash + +playlist_id="${1}" +total_entries="${2}" + +# select YOUTUBE_*DIR settings +# convert None to '' +# convert PosixPath('VALUE') to 'VALUE' +# assign a shell variable with the setting name and value +_awk_prog='$2 == "=" && $1 ~ /^YOUTUBE_/ && $1 ~ /DIR$/ { + sub(/^None$/, "'\'\''", $3); + r = sub(/^PosixPath[(]/, "", $3); + NF--; + if(r) {sub(/[)]$/, "", $NF);}; + $3=$1 $2 $3; $1=$2=""; sub("^" OFS "+", ""); + print; + }' +. <(python3 /app/manage.py diffsettings --output hash | awk "${_awk_prog}") +WHERE="${YOUTUBE_DL_CACHEDIR:-/dev/shm}" + +downloaded_entries="$( find /dev/shm "${WHERE}" \ + -path '*/infojson/playlist/postprocessor_*_temp\.info\.json' \ + -name "postprocessor_[[]${playlist_id}[]]_*_${total_entries}_temp\.info\.json" \ + -exec basename '{}' ';' | \ + sed -e 's/^postprocessor_[[].*[]]_//;s/_temp.*\.json$//;' | \ + cut -d '_' -f 1 )" + +find /dev/shm "${WHERE}" \ + -path '*/infojson/playlist/postprocessor_*_temp\.info\.json' \ + -name "postprocessor_[[]${playlist_id}[]]_*_temp\.info\.json" \ + -type f -delete + +if [ 'NA' != "${downloaded_entries:=${3:-NA}}" ] && + [ 'NA' != "${total_entries:-NA}" ] && + [ "${downloaded_entries}" != "${total_entries}" ] +then + exit 1 +fi + +exit 0 diff --git a/tubesync/healthcheck.py b/tubesync/healthcheck.py index 5bc127b0..5cdc63ed 100755 --- a/tubesync/healthcheck.py +++ b/tubesync/healthcheck.py @@ -18,6 +18,8 @@ import requests TIMEOUT = 5 # Seconds HTTP_USER = os.getenv('HTTP_USER') HTTP_PASS = os.getenv('HTTP_PASS') +# never use proxy for healthcheck requests +os.environ['no_proxy'] = '*' def do_heatlhcheck(url): @@ -30,6 +32,9 @@ def do_heatlhcheck(url): if __name__ == '__main__': + # if it is marked as intentionally down, nothing else matters + if os.path.exists('/run/service/gunicorn/down'): + sys.exit(0) try: url = sys.argv[1] except IndexError: diff --git a/tubesync/sync/choices.py b/tubesync/sync/choices.py index f0c6e45a..25dd762a 100644 --- a/tubesync/sync/choices.py +++ b/tubesync/sync/choices.py @@ -132,7 +132,7 @@ class SourceResolution(models.TextChoices): VIDEO_720P = '720p', _('720p (HD)') VIDEO_1080P = '1080p', _('1080p (Full HD)') VIDEO_1440P = '1440p', _('1440p (2K)') - VIDEO_2160P = '2160p', _('4320p (8K)') + VIDEO_2160P = '2160p', _('2160p (4K)') VIDEO_4320P = '4320p', _('4320p (8K)') @classmethod @@ -160,6 +160,12 @@ class SponsorBlock_Category(models.TextChoices): MUSIC_OFFTOPIC = 'music_offtopic', _( 'Non-Music Section' ) +class TaskQueue(models.TextChoices): + DB = 'database', _('Database') + FS = 'filesystem', _('Filesystem') + NET = 'network', _('Networking') + + class YouTube_SourceType(models.TextChoices): CHANNEL = 'c', _('YouTube channel') CHANNEL_ID = 'i', _('YouTube channel by ID') diff --git a/tubesync/sync/fields.py b/tubesync/sync/fields.py index 2910b7cc..2f479b68 100644 --- a/tubesync/sync/fields.py +++ b/tubesync/sync/fields.py @@ -145,6 +145,9 @@ class CommaSepChoiceField(models.CharField): # The data was lost; we can regenerate it. args_dict = {key: self.__dict__[key] for key in CommaSepChoice._fields} args_dict['selected_choices'] = list(value) + # setting a string manually should not result in characters + if isinstance(value, str) and len(value) > 0: + args_dict['selected_choices'] = value.split(self.separator) data = CommaSepChoice(**args_dict) value = data.selected_choices s_value = super().get_prep_value(value) diff --git a/tubesync/sync/hooks.py b/tubesync/sync/hooks.py index c644da59..3bb3ce0d 100644 --- a/tubesync/sync/hooks.py +++ b/tubesync/sync/hooks.py @@ -2,6 +2,7 @@ import os import yt_dlp from common.logger import log +from common.utils import remove_enclosed from django.conf import settings @@ -81,9 +82,9 @@ class BaseStatus: if self.task_verbose_name is None: # clean up any previously prepended task_status # this happened because of duplicated tasks on my test system - s = task.verbose_name - cleaned = s[1+s.find(' Downloading '):] - self.task_verbose_name = cleaned + self.task_verbose_name = remove_enclosed( + task.verbose_name, '[', ']', ' ', + ) task.verbose_name = f'{self.task_status} {self.task_verbose_name}' task.save() diff --git a/tubesync/sync/management/commands/delete-source.py b/tubesync/sync/management/commands/delete-source.py index 104ec887..2f149a67 100644 --- a/tubesync/sync/management/commands/delete-source.py +++ b/tubesync/sync/management/commands/delete-source.py @@ -5,8 +5,7 @@ from django.core.management.base import BaseCommand, CommandError from django.db.models import signals from common.logger import log from sync.models import Source, Media, MediaServer -from sync.signals import media_post_delete -from sync.tasks import rescan_media_server +from sync.tasks import schedule_media_servers_update class Command(BaseCommand): @@ -29,23 +28,14 @@ class Command(BaseCommand): except Source.DoesNotExist: raise CommandError(f'Source does not exist with ' f'UUID: {source_uuid}') - # Detach post-delete signal for Media so we don't spam media servers - signals.post_delete.disconnect(media_post_delete, sender=Media) + # Reconfigure the source to not update the disk or media servers + source.deactivate() # Delete the source, triggering pre-delete signals for each media item log.info(f'Found source with UUID "{source.uuid}" with name ' f'"{source.name}" and deleting it, this may take some time!') + log.info(f'Source directory: {source.directory_path}') source.delete() # Update any media servers - for mediaserver in MediaServer.objects.all(): - log.info(f'Scheduling media server updates') - verbose_name = _('Request media server rescan for "{}"') - rescan_media_server( - str(mediaserver.pk), - priority=0, - verbose_name=verbose_name.format(mediaserver), - remove_existing_tasks=True - ) - # Re-attach signals - signals.post_delete.connect(media_post_delete, sender=Media) + schedule_media_servers_update() # All done log.info('Done') diff --git a/tubesync/sync/management/commands/import-existing-media.py b/tubesync/sync/management/commands/import-existing-media.py index 6b723e70..7dddc8c4 100644 --- a/tubesync/sync/management/commands/import-existing-media.py +++ b/tubesync/sync/management/commands/import-existing-media.py @@ -16,7 +16,7 @@ class Command(BaseCommand): log.info('Building directory to Source map...') dirmap = {} for s in Source.objects.all(): - dirmap[s.directory_path] = s + dirmap[str(s.directory_path)] = s log.info(f'Scanning sources...') file_extensions = list(FileExtension.values) + self.extra_extensions for sourceroot, source in dirmap.items(): @@ -38,7 +38,8 @@ class Command(BaseCommand): ext = ext.strip().lower() if ext not in file_extensions: continue - on_disk.append(str(rootpath / filename)) + filepath = Path(rootpath / filename).resolve(strict=True) + on_disk.append(str(filepath)) filemap = {} for item in media: for filepath in on_disk: @@ -50,7 +51,19 @@ class Command(BaseCommand): for filepath, item in filemap.items(): log.info(f'Matched on-disk file: {filepath} ' f'to media item: {item.source} / {item}') - item.media_file.name = filepath + item.media_file.name = str(Path(filepath).relative_to(item.media_file.storage.location)) item.downloaded = True + item.downloaded_filesize = Path(filepath).stat().st_size + # set a reasonable download date + date = item.metadata_published(Path(filepath).stat().st_mtime) + if item.published and item.published > date: + date = item.published + if item.has_metadata: + metadata_date = item.metadata_published(item.get_metadata_first_value('epoch', 0)) + if metadata_date and metadata_date > date: + date = metadata_date + if item.download_date and item.download_date > date: + date = item.download_date + item.download_date = date item.save() log.info('Done') diff --git a/tubesync/sync/management/commands/reset-tasks.py b/tubesync/sync/management/commands/reset-tasks.py index d65abfc3..3d6f515d 100644 --- a/tubesync/sync/management/commands/reset-tasks.py +++ b/tubesync/sync/management/commands/reset-tasks.py @@ -1,4 +1,5 @@ from django.core.management.base import BaseCommand, CommandError +from django.db.transaction import atomic from django.utils.translation import gettext_lazy as _ from background_task.models import Task from sync.models import Source @@ -14,20 +15,26 @@ class Command(BaseCommand): def handle(self, *args, **options): log.info('Resettings all tasks...') - # Delete all tasks - Task.objects.all().delete() - # Iter all tasks - for source in Source.objects.all(): - # Recreate the initial indexing task - log.info(f'Resetting tasks for source: {source}') - verbose_name = _('Index media from source "{}"') - index_source_task( - str(source.pk), - repeat=source.index_schedule, - queue=str(source.pk), - priority=5, - verbose_name=verbose_name.format(source.name) - ) - # This also chains down to call each Media objects .save() as well - source.save() + with atomic(durable=True): + # Delete all tasks + Task.objects.all().delete() + # Iter all sources, creating new tasks + for source in Source.objects.all(): + verbose_name = _('Check download directory exists for source "{}"') + check_source_directory_exists( + str(source.pk), + verbose_name=verbose_name.format(source.name), + ) + # Recreate the initial indexing task + log.info(f'Resetting tasks for source: {source}') + verbose_name = _('Index media from source "{}"') + index_source_task( + str(source.pk), + repeat=source.index_schedule, + verbose_name=verbose_name.format(source.name), + ) + with atomic(durable=True): + for source in Source.objects.all(): + # This also chains down to call each Media objects .save() as well + source.save() log.info('Done') diff --git a/tubesync/sync/matching.py b/tubesync/sync/matching.py index 9390e6fa..93f7e4d0 100644 --- a/tubesync/sync/matching.py +++ b/tubesync/sync/matching.py @@ -95,6 +95,8 @@ def get_best_video_format(media): continue if not fmt['vcodec']: continue + if any(key[0] not in fmt for key in sort_keys): + continue if media.source.source_resolution.strip().upper() == fmt['format']: video_formats.append(fmt) elif media.source.source_resolution_height == fmt['height']: @@ -236,7 +238,7 @@ def get_best_video_format(media): break if not best_match: for fmt in video_formats: - # Check for codec and resolution match bot drop 60fps + # Check for codec and resolution match but drop 60fps if (source_resolution == fmt['format'] and source_vcodec == fmt['vcodec'] and not fmt['is_hdr']): @@ -294,7 +296,7 @@ def get_best_video_format(media): break if not best_match: for fmt in video_formats: - # Check for codec and resolution match bot drop hdr + # Check for codec and resolution match but drop hdr if (source_resolution == fmt['format'] and source_vcodec == fmt['vcodec'] and not fmt['is_60fps']): diff --git a/tubesync/sync/migrations/0028_alter_source_source_resolution.py b/tubesync/sync/migrations/0028_alter_source_source_resolution.py index d3535892..e72f7307 100644 --- a/tubesync/sync/migrations/0028_alter_source_source_resolution.py +++ b/tubesync/sync/migrations/0028_alter_source_source_resolution.py @@ -11,7 +11,7 @@ class Migration(migrations.Migration): migrations.AlterField( model_name='source', name='source_resolution', - field=models.CharField(choices=[('audio', 'Audio only'), ('360p', '360p (SD)'), ('480p', '480p (SD)'), ('720p', '720p (HD)'), ('1080p', '1080p (Full HD)'), ('1440p', '1440p (2K)'), ('2160p', '4320p (8K)'), ('4320p', '4320p (8K)')], db_index=True, default='1080p', help_text='Source resolution, desired video resolution to download', max_length=8, verbose_name='source resolution'), + field=models.CharField(choices=[('audio', 'Audio only'), ('360p', '360p (SD)'), ('480p', '480p (SD)'), ('720p', '720p (HD)'), ('1080p', '1080p (Full HD)'), ('1440p', '1440p (2K)'), ('2160p', '2160p (4K)'), ('4320p', '4320p (8K)')], db_index=True, default='1080p', help_text='Source resolution, desired video resolution to download', max_length=8, verbose_name='source resolution'), ), ] diff --git a/tubesync/sync/migrations/0030_alter_source_source_vcodec.py b/tubesync/sync/migrations/0030_alter_source_source_vcodec.py new file mode 100644 index 00000000..2b4f3618 --- /dev/null +++ b/tubesync/sync/migrations/0030_alter_source_source_vcodec.py @@ -0,0 +1,18 @@ +# Generated by Django 5.1.8 on 2025-04-07 18:28 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('sync', '0029_alter_mediaserver_fields'), + ] + + operations = [ + migrations.AlterField( + model_name='source', + name='source_vcodec', + field=models.CharField(choices=[('AVC1', 'AVC1 (H.264)'), ('VP9', 'VP9'), ('AV1', 'AV1')], db_index=True, default='VP9', help_text='Source video codec, desired video encoding format to download (ignored if "resolution" is audio only)', max_length=8, verbose_name='source video codec'), + ), + ] diff --git a/tubesync/sync/models.py b/tubesync/sync/models.py index cb02c2cd..c3424bd6 100644 --- a/tubesync/sync/models.py +++ b/tubesync/sync/models.py @@ -2,10 +2,11 @@ import os import uuid import json import re -from xml.etree import ElementTree from collections import OrderedDict +from copy import deepcopy from datetime import datetime, timedelta, timezone as tz from pathlib import Path +from xml.etree import ElementTree from django.conf import settings from django.db import models from django.core.exceptions import SuspiciousOperation @@ -236,7 +237,7 @@ class Source(models.Model): _('source video codec'), max_length=8, db_index=True, - choices=list(reversed(YouTube_VideoCodec.choices[1:])), + choices=list(reversed(YouTube_VideoCodec.choices)), default=YouTube_VideoCodec.VP9, help_text=_('Source video codec, desired video encoding format to download (ignored if "resolution" is audio only)') ) @@ -332,6 +333,27 @@ class Source(models.Model): replaced = self.name.replace('_', '-').replace('&', 'and').replace('+', 'and') return slugify(replaced)[:80] + def deactivate(self): + self.download_media = False + self.index_streams = False + self.index_videos = False + self.index_schedule = IndexSchedule.NEVER + self.save(update_fields={ + 'download_media', + 'index_streams', + 'index_videos', + 'index_schedule', + }) + + @property + def is_active(self): + active = ( + self.download_media or + self.index_streams or + self.index_videos + ) + return self.index_schedule and active + @property def is_audio(self): return self.source_resolution == SourceResolution.AUDIO.value @@ -507,7 +529,10 @@ class Source(models.Model): indexer = self.INDEXERS.get(self.source_type, None) if not callable(indexer): raise Exception(f'Source type f"{self.source_type}" has no indexer') - response = indexer(self.get_index_url(type=type)) + days = None + if self.download_cap_date: + days = timedelta(seconds=self.download_cap).days + response = indexer(self.get_index_url(type=type), days=days) if not isinstance(response, dict): return [] entries = response.get('entries', []) @@ -546,6 +571,9 @@ class Media(models.Model): Source. ''' + # Used to convert seconds to datetime + posix_epoch = datetime(1970, 1, 1, tzinfo=tz.utc) + # Format to use to display a URL for the media URLS = _srctype_dict('https://www.youtube.com/watch?v={key}') @@ -768,7 +796,26 @@ class Media(models.Model): ) def save(self, force_insert=False, force_update=False, using=None, update_fields=None): + # Correct the path after a source is renamed + if self.created and self.downloaded and not self.media_file_exists: + fp_list = list((self.filepath,)) + if self.media_file: + # Try the new computed directory + the file base name from the database + fp_list.append(self.filepath.parent / Path(self.media_file.path).name) + for filepath in fp_list: + if filepath.exists(): + self.media_file.name = str( + filepath.relative_to( + self.media_file.storage.location + ) + ) + self.skip = False + if update_fields is not None: + update_fields = {'media_file', 'skip'}.union(update_fields) + # Trigger an update of derived fields from metadata + if update_fields is None or 'metadata' in update_fields: + setattr(self, '_cached_metadata_dict', None) if self.metadata: self.title = self.metadata_title[:200] self.duration = self.metadata_duration @@ -786,6 +833,30 @@ class Media(models.Model): fields = self.METADATA_FIELDS.get(field, {}) return fields.get(self.source.source_type, field) + def get_metadata_first_value(self, iterable, default=None, /): + ''' + fetch the first key with a value from metadata + ''' + + # str is an iterable of characters + # we do not want to look for each character! + if isinstance(iterable, str): + iterable = (iterable,) + for key in tuple(iterable): + # reminder: unmapped fields return the key itself + field = self.get_metadata_field(key) + value = self.loaded_metadata.get(field) + # value can be None because: + # - None was stored at the key + # - the key was not in the dictionary + # either way, we don't want those values + if value is None: + continue + if isinstance(value, str): + return value.strip() + return value + return default + def iter_formats(self): for fmt in self.formats: yield parse_media_format(fmt) @@ -882,14 +953,19 @@ class Media(models.Model): resolution = self.downloaded_format.lower() elif self.downloaded_height: resolution = f'{self.downloaded_height}p' + if resolution: + fmt.append(resolution) if self.downloaded_format != Val(SourceResolution.AUDIO): vcodec = self.downloaded_video_codec.lower() + if vcodec: fmt.append(vcodec) acodec = self.downloaded_audio_codec.lower() - fmt.append(acodec) + if acodec: + fmt.append(acodec) if self.downloaded_format != Val(SourceResolution.AUDIO): fps = str(self.downloaded_fps) - fmt.append(f'{fps}fps') + if fps: + fmt.append(f'{fps}fps') if self.downloaded_hdr: hdr = 'hdr' fmt.append(hdr) @@ -921,13 +997,19 @@ class Media(models.Model): # Combined vformat = cformat if vformat: - resolution = vformat['format'].lower() - fmt.append(resolution) + if vformat['format']: + resolution = vformat['format'].lower() + else: + resolution = f"{vformat['height']}p" + if resolution: + fmt.append(resolution) vcodec = vformat['vcodec'].lower() - fmt.append(vcodec) + if vcodec: + fmt.append(vcodec) if aformat: acodec = aformat['acodec'].lower() - fmt.append(acodec) + if acodec: + fmt.append(acodec) if vformat: if vformat['is_60fps']: fps = '60fps' @@ -991,28 +1073,49 @@ class Media(models.Model): 'uploader': self.uploader, } + @property def has_metadata(self): return self.metadata is not None + def save_to_metadata(self, key, value, /): + data = self.loaded_metadata + data[key] = value + from common.utils import json_serial + compact_json = json.dumps(data, separators=(',', ':'), default=json_serial) + self.metadata = compact_json + self.save(update_fields={'metadata'}) + from common.logger import log + log.debug(f'Saved to metadata: {self.key} / {self.uuid}: {key=}: {value}') + + @property def reduce_data(self): + now = timezone.now() try: - from common.logger import log - from common.utils import json_serial - - old_mdl = len(self.metadata or "") data = json.loads(self.metadata or "{}") + if '_reduce_data_ran_at' in data.keys(): + total_seconds = data['_reduce_data_ran_at'] + assert isinstance(total_seconds, int), type(total_seconds) + ran_at = self.metadata_published(total_seconds) + if (now - ran_at) < timedelta(hours=1): + return data + + from common.utils import json_serial compact_json = json.dumps(data, separators=(',', ':'), default=json_serial) filtered_data = filter_response(data, True) + filtered_data['_reduce_data_ran_at'] = round((now - self.posix_epoch).total_seconds()) filtered_json = json.dumps(filtered_data, separators=(',', ':'), default=json_serial) except Exception as e: + from common.logger import log log.exception('reduce_data: %s', e) else: + from common.logger import log # log the results of filtering / compacting on metadata size new_mdl = len(compact_json) + old_mdl = len(self.metadata or "") if old_mdl > new_mdl: delta = old_mdl - new_mdl log.info(f'{self.key}: metadata compacted by {delta:,} characters ({old_mdl:,} -> {new_mdl:,})') @@ -1022,34 +1125,57 @@ class Media(models.Model): log.info(f'{self.key}: metadata reduced by {delta:,} characters ({old_mdl:,} -> {new_mdl:,})') if getattr(settings, 'SHRINK_OLD_MEDIA_METADATA', False): self.metadata = filtered_json + return filtered_data + return data @property def loaded_metadata(self): + cached = getattr(self, '_cached_metadata_dict', None) + if cached: + return deepcopy(cached) + data = None if getattr(settings, 'SHRINK_OLD_MEDIA_METADATA', False): - self.reduce_data + data = self.reduce_data try: - data = json.loads(self.metadata) + if not data: + data = json.loads(self.metadata or "{}") if not isinstance(data, dict): return {} + setattr(self, '_cached_metadata_dict', data) return data except Exception as e: return {} + @property def refresh_formats(self): + if not self.has_metadata: + return data = self.loaded_metadata metadata_seconds = data.get('epoch', None) if not metadata_seconds: self.metadata = None + self.save(update_fields={'metadata'}) return False now = timezone.now() - formats_seconds = data.get('formats_epoch', metadata_seconds) + attempted_key = '_refresh_formats_attempted' + attempted_seconds = data.get(attempted_key) + if attempted_seconds: + # skip for recent unsuccessful refresh attempts also + attempted_dt = self.metadata_published(attempted_seconds) + if (now - attempted_dt) < timedelta(seconds=self.source.index_schedule): + return False + # skip for recent successful formats refresh + refreshed_key = 'formats_epoch' + formats_seconds = data.get(refreshed_key, metadata_seconds) metadata_dt = self.metadata_published(formats_seconds) if (now - metadata_dt) < timedelta(seconds=self.source.index_schedule): return False + last_attempt = round((now - self.posix_epoch).total_seconds()) + self.save_to_metadata(attempted_key, last_attempt) self.skip = False metadata = self.index_metadata() if self.skip: @@ -1060,16 +1186,13 @@ class Media(models.Model): response = filter_response(metadata, True) field = self.get_metadata_field('formats') - data[field] = response.get(field, []) + self.save_to_metadata(field, response.get(field, [])) + self.save_to_metadata(refreshed_key, response.get('epoch', formats_seconds)) if data.get('availability', 'public') != response.get('availability', 'public'): - data['availability'] = response.get('availability', 'public') - data['formats_epoch'] = response.get('epoch', formats_seconds) - - from common.utils import json_serial - compact_json = json.dumps(data, separators=(',', ':'), default=json_serial) - self.metadata = compact_json + self.save_to_metadata('availability', response.get('availability', 'public')) return True + @property def url(self): url = self.URLS.get(self.source.source_type, '') @@ -1077,33 +1200,24 @@ class Media(models.Model): @property def description(self): - field = self.get_metadata_field('description') - return self.loaded_metadata.get(field, '').strip() + return self.get_metadata_first_value('description', '') @property def metadata_title(self): - result = '' - for key in ('fulltitle', 'title'): - field = self.get_metadata_field(key) - value = self.loaded_metadata.get(field, '').strip() - if value: - result = value - break - return result + return self.get_metadata_first_value(('fulltitle', 'title',), '') def metadata_published(self, timestamp=None): - published_dt = None if timestamp is None: - field = self.get_metadata_field('timestamp') - timestamp = self.loaded_metadata.get(field, None) + timestamp = self.get_metadata_first_value('timestamp') if timestamp is not None: try: timestamp_float = float(timestamp) - posix_epoch = datetime(1970, 1, 1, tzinfo=tz.utc) - published_dt = posix_epoch + timedelta(seconds=timestamp_float) except Exception as e: log.warn(f'Could not compute published from timestamp for: {self.source} / {self} with "{e}"') - return published_dt + pass + else: + return self.posix_epoch + timedelta(seconds=timestamp_float) + return None @property def slugtitle(self): @@ -1112,8 +1226,8 @@ class Media(models.Model): @property def thumbnail(self): - field = self.get_metadata_field('thumbnail') - return self.loaded_metadata.get(field, '').strip() + default = f'https://i.ytimg.com/vi/{self.key}/maxresdefault.jpg' + return self.get_metadata_first_value('thumbnail', default) @property def name(self): @@ -1122,20 +1236,19 @@ class Media(models.Model): @property def upload_date(self): - field = self.get_metadata_field('upload_date') - try: - upload_date_str = self.loaded_metadata.get(field, '').strip() - except (AttributeError, ValueError) as e: + upload_date_str = self.get_metadata_first_value('upload_date') + if not upload_date_str: return None try: return datetime.strptime(upload_date_str, '%Y%m%d') except (AttributeError, ValueError) as e: - return None + log.debug(f'Media.upload_date: {self.source} / {self}: strptime: {e}') + pass + return None @property def metadata_duration(self): - field = self.get_metadata_field('duration') - duration = self.loaded_metadata.get(field, 0) + duration = self.get_metadata_first_value('duration', 0) try: duration = int(duration) except (TypeError, ValueError): @@ -1151,52 +1264,45 @@ class Media(models.Model): @property def categories(self): - field = self.get_metadata_field('categories') - return self.loaded_metadata.get(field, []) + return self.get_metadata_first_value('categories', list()) @property def rating(self): - field = self.get_metadata_field('rating') - return self.loaded_metadata.get(field, 0) + return self.get_metadata_first_value('rating', 0) @property def votes(self): - field = self.get_metadata_field('upvotes') - upvotes = self.loaded_metadata.get(field, 0) + upvotes = self.get_metadata_first_value('upvotes', 0) if not isinstance(upvotes, int): upvotes = 0 - field = self.get_metadata_field('downvotes') - downvotes = self.loaded_metadata.get(field, 0) + downvotes = self.get_metadata_first_value('downvotes', 0) if not isinstance(downvotes, int): downvotes = 0 return upvotes + downvotes @property def age_limit(self): - field = self.get_metadata_field('age_limit') - return self.loaded_metadata.get(field, 0) + return self.get_metadata_first_value('age_limit', 0) @property def uploader(self): - field = self.get_metadata_field('uploader') - return self.loaded_metadata.get(field, '') + return self.get_metadata_first_value('uploader', '') @property def formats(self): - field = self.get_metadata_field('formats') - return self.loaded_metadata.get(field, []) + return self.get_metadata_first_value('formats', list()) @property def playlist_title(self): - field = self.get_metadata_field('playlist_title') - return self.loaded_metadata.get(field, '') + return self.get_metadata_first_value('playlist_title', '') @property def filename(self): # Create a suitable filename from the source media_format media_format = str(self.source.media_format) media_details = self.format_dict - return media_format.format(**media_details) + result = media_format.format(**media_details) + return '.' + result if '/' == result[0] else result @property def directory_path(self): @@ -1448,17 +1554,35 @@ class Media(models.Model): def calculate_episode_number(self): if self.source.is_playlist: - sorted_media = Media.objects.filter(source=self.source) + sorted_media = Media.objects.filter( + source=self.source, + metadata__isnull=False, + ).order_by( + 'published', + 'created', + 'key', + ) else: - self_year = self.upload_date.year if self.upload_date else self.created.year - filtered_media = Media.objects.filter(source=self.source, published__year=self_year) - filtered_media = [m for m in filtered_media if m.upload_date is not None] - sorted_media = sorted(filtered_media, key=lambda x: (x.upload_date, x.key)) - position_counter = 1 - for media in sorted_media: + self_year = self.created.year # unlikely to be accurate + if self.published: + self_year = self.published.year + elif self.has_metadata and self.upload_date: + self_year = self.upload_date.year + elif self.download_date: + # also, unlikely to be accurate + self_year = self.download_date.year + sorted_media = Media.objects.filter( + source=self.source, + metadata__isnull=False, + published__year=self_year, + ).order_by( + 'published', + 'created', + 'key', + ) + for counter, media in enumerate(sorted_media, start=1): if media == self: - return position_counter - position_counter += 1 + return counter def get_episode_str(self, use_padding=False): episode_number = self.calculate_episode_number() @@ -1503,19 +1627,27 @@ class Media(models.Model): # update the media_file in the db self.media_file.name = str(new_video_path.relative_to(self.media_file.storage.location)) - self.save() + self.skip = False + self.save(update_fields=('media_file', 'skip')) log.info(f'Updated "media_file" in the database for: {self!s}') (new_prefix_path, new_stem) = directory_and_stem(new_video_path) # move and change names to match stem for other_path in other_paths: + # it should exist, but check anyway + if not other_path.exists(): + continue + old_file_str = other_path.name new_file_str = new_stem + old_file_str[len(old_stem):] new_file_path = Path(new_prefix_path / new_file_str) + if new_file_path == other_path: + continue log.debug(f'Considering replace for: {self!s}\n\t{other_path!s}\n\t{new_file_path!s}') - # it should exist, but check anyway - if other_path.exists(): + # do not move the file we just updated in the database + # doing that loses track of the `Media.media_file` entirely + if not new_video_path.samefile(other_path): log.debug(f'{self!s}: {other_path!s} => {new_file_path!s}') other_path.replace(new_file_path) @@ -1524,6 +1656,8 @@ class Media(models.Model): old_file_str = fuzzy_path.name new_file_str = new_stem + old_file_str[len(fuzzy_stem):] new_file_path = Path(new_prefix_path / new_file_str) + if new_file_path == fuzzy_path: + continue log.debug(f'Considering rename for: {self!s}\n\t{fuzzy_path!s}\n\t{new_file_path!s}') # it quite possibly was renamed already if fuzzy_path.exists() and not new_file_path.exists(): @@ -1537,8 +1671,9 @@ class Media(models.Model): # try to remove empty dirs parent_dir = old_video_path.parent + stop_dir = self.source.directory_path try: - while parent_dir.is_dir(): + while parent_dir.is_relative_to(stop_dir): parent_dir.rmdir() log.info(f'Removed empty directory: {parent_dir!s}') parent_dir = parent_dir.parent diff --git a/tubesync/sync/signals.py b/tubesync/sync/signals.py index 5800c5ce..4c332eca 100644 --- a/tubesync/sync/signals.py +++ b/tubesync/sync/signals.py @@ -1,4 +1,6 @@ from pathlib import Path +from shutil import rmtree +from tempfile import TemporaryDirectory from django.conf import settings from django.db.models.signals import pre_save, post_save, pre_delete, post_delete from django.dispatch import receiver @@ -11,8 +13,8 @@ from .tasks import (delete_task_by_source, delete_task_by_media, index_source_ta download_media_thumbnail, download_media_metadata, map_task_to_instance, check_source_directory_exists, download_media, rescan_media_server, download_source_images, - save_all_media_for_source, rename_media, - get_media_metadata_task, get_media_download_task) + delete_all_media_for_source, save_all_media_for_source, + rename_media, get_media_metadata_task, get_media_download_task) from .utils import delete_file, glob_quote, mkdir_p from .filtering import filter_media from .choices import Val, YouTube_SourceType @@ -25,17 +27,61 @@ def source_pre_save(sender, instance, **kwargs): try: existing_source = Source.objects.get(pk=instance.pk) except Source.DoesNotExist: - # Probably not possible? + log.debug(f'source_pre_save signal: no existing source: {sender} - {instance}') return + + args = ( str(instance.pk), ) + check_source_directory_exists.now(*args) existing_dirpath = existing_source.directory_path.resolve(strict=True) new_dirpath = instance.directory_path.resolve(strict=False) - rename_source_directory = ( - existing_dirpath != new_dirpath and - not new_dirpath.exists() - ) - if rename_source_directory: - mkdir_p(new_dirpath.parent) - existing_dirpath.rename(new_dirpath) + if existing_dirpath != new_dirpath: + path_name = lambda p: p.name + relative_dir = existing_source.directory + rd_parents = Path(relative_dir).parents + rd_parents_set = set(map(path_name, rd_parents)) + ad_parents = existing_dirpath.parents + ad_parents_set = set(map(path_name, ad_parents)) + # the names in the relative path are also in the absolute path + parents_count = len(ad_parents_set.intersection(rd_parents_set)) + work_directory = existing_dirpath + for _count in range(parents_count, 0, -1): + work_directory = work_directory.parent + if not Path(work_directory).resolve(strict=True).is_relative_to(Path(settings.DOWNLOAD_ROOT)): + work_directory = Path(settings.DOWNLOAD_ROOT) + with TemporaryDirectory(suffix=('.'+new_dirpath.name), prefix='.tmp.', dir=work_directory) as tmp_dir: + tmp_dirpath = Path(tmp_dir) + existed = None + previous = existing_dirpath.rename(tmp_dirpath / 'previous') + try: + if new_dirpath.exists(): + existed = new_dirpath.rename(tmp_dirpath / 'existed') + mkdir_p(new_dirpath.parent) + previous.rename(new_dirpath) + except Exception: + # try to preserve the directory, if anything went wrong + previous.rename(existing_dirpath) + raise + else: + existing_dirpath = previous = None + if existed and existed.is_dir(): + existed = existed.rename(new_dirpath / '.existed') + for entry_path in existed.iterdir(): + try: + target = new_dirpath / entry_path.name + if not target.exists(): + entry_path = entry_path.rename(target) + except Exception as e: + log.exception(e) + try: + existed.rmdir() + except Exception as e: + log.exception(e) + elif existed: + try: + existed = existed.rename(new_dirpath / ('.existed-' + new_dirpath.name)) + except Exception as e: + log.exception(e) + recreate_index_source_task = ( existing_source.name != instance.name or existing_source.index_schedule != instance.index_schedule @@ -46,12 +92,9 @@ def source_pre_save(sender, instance, **kwargs): verbose_name = _('Index media from source "{}"') index_source_task( str(instance.pk), - schedule=instance.index_schedule, repeat=instance.index_schedule, - queue=str(instance.pk), - priority=10, + schedule=instance.index_schedule, verbose_name=verbose_name.format(instance.name), - remove_existing_tasks=True ) @@ -62,14 +105,12 @@ def source_post_save(sender, instance, created, **kwargs): verbose_name = _('Check download directory exists for source "{}"') check_source_directory_exists( str(instance.pk), - priority=0, - verbose_name=verbose_name.format(instance.name) + verbose_name=verbose_name.format(instance.name), ) if instance.source_type != Val(YouTube_SourceType.PLAYLIST) and instance.copy_channel_images: download_source_images( str(instance.pk), - priority=5, - verbose_name=verbose_name.format(instance.name) + verbose_name=verbose_name.format(instance.name), ) if instance.index_schedule > 0: delete_task_by_source('sync.tasks.index_source_task', instance.pk) @@ -77,46 +118,15 @@ def source_post_save(sender, instance, created, **kwargs): verbose_name = _('Index media from source "{}"') index_source_task( str(instance.pk), - schedule=600, repeat=instance.index_schedule, - queue=str(instance.pk), - priority=10, + schedule=600, verbose_name=verbose_name.format(instance.name), - remove_existing_tasks=True - ) - # Check settings before any rename tasks are scheduled - rename_sources_setting = settings.RENAME_SOURCES or list() - create_rename_tasks = ( - ( - instance.directory and - instance.directory in rename_sources_setting - ) or - settings.RENAME_ALL_SOURCES - ) - if create_rename_tasks: - mqs = Media.objects.filter( - source=instance.pk, - downloaded=True, - ).defer( - 'media_file', - 'metadata', - 'thumb', - ) - for media in mqs: - verbose_name = _('Renaming media for: {}: "{}"') - rename_media( - str(media.pk), - queue=str(media.pk), - priority=16, - verbose_name=verbose_name.format(media.key, media.name), - remove_existing_tasks=True ) + verbose_name = _('Checking all media for source "{}"') save_all_media_for_source( str(instance.pk), - priority=9, verbose_name=verbose_name.format(instance.name), - remove_existing_tasks=True ) @@ -124,16 +134,44 @@ def source_post_save(sender, instance, created, **kwargs): def source_pre_delete(sender, instance, **kwargs): # Triggered before a source is deleted, delete all media objects to trigger # the Media models post_delete signal - for media in Media.objects.filter(source=instance): - log.info(f'Deleting media for source: {instance.name} item: {media.name}') - media.delete() + log.info(f'Deactivating source: {instance.name}') + instance.deactivate() + log.info(f'Deleting tasks for source: {instance.name}') + delete_task_by_source('sync.tasks.index_source_task', instance.pk) + delete_task_by_source('sync.tasks.check_source_directory_exists', instance.pk) + delete_task_by_source('sync.tasks.rename_all_media_for_source', instance.pk) + delete_task_by_source('sync.tasks.save_all_media_for_source', instance.pk) + # Schedule deletion of media + delete_task_by_source('sync.tasks.delete_all_media_for_source', instance.pk) + verbose_name = _('Deleting all media for source "{}"') + delete_all_media_for_source( + str(instance.pk), + str(instance.name), + verbose_name=verbose_name.format(instance.name), + ) + # Try to do it all immediately + # If this is killed, the scheduled task should do the work instead. + delete_all_media_for_source.now( + str(instance.pk), + str(instance.name), + ) @receiver(post_delete, sender=Source) def source_post_delete(sender, instance, **kwargs): # Triggered after a source is deleted - log.info(f'Deleting tasks for source: {instance.name}') + source = instance + log.info(f'Deleting tasks for removed source: {source.name}') delete_task_by_source('sync.tasks.index_source_task', instance.pk) + delete_task_by_source('sync.tasks.check_source_directory_exists', instance.pk) + delete_task_by_source('sync.tasks.delete_all_media_for_source', instance.pk) + delete_task_by_source('sync.tasks.rename_all_media_for_source', instance.pk) + delete_task_by_source('sync.tasks.save_all_media_for_source', instance.pk) + # Remove the directory, if the user requested that + directory_path = Path(source.directory_path) + if (directory_path / '.to_be_removed').is_file(): + log.info(f'Deleting directory for: {source.name}: {directory_path}') + rmtree(directory_path, True) @receiver(task_failed, sender=Task) @@ -152,6 +190,7 @@ def task_task_failed(sender, task_id, completed_task, **kwargs): @receiver(post_save, sender=Media) def media_post_save(sender, instance, created, **kwargs): + media = instance # If the media is skipped manually, bail. if instance.manual_skip: return @@ -160,30 +199,50 @@ def media_post_save(sender, instance, created, **kwargs): can_download_changed = False # Reset the skip flag if the download cap has changed if the media has not # already been downloaded - if not instance.downloaded: - skip_changed = filter_media(instance) - - # Recalculate the "can_download" flag, this may - # need to change if the source specifications have been changed - if instance.metadata: - if instance.get_format_str(): - if not instance.can_download: - instance.can_download = True - can_download_changed = True - else: - if instance.can_download: - instance.can_download = False - can_download_changed = True + downloaded = instance.downloaded existing_media_metadata_task = get_media_metadata_task(str(instance.pk)) + existing_media_download_task = get_media_download_task(str(instance.pk)) + if not downloaded: + # the decision to download was already made if a download task exists + if not existing_media_download_task: + # Recalculate the "can_download" flag, this may + # need to change if the source specifications have been changed + if instance.metadata: + if instance.get_format_str(): + if not instance.can_download: + instance.can_download = True + can_download_changed = True + else: + if instance.can_download: + instance.can_download = False + can_download_changed = True + # Recalculate the "skip_changed" flag + skip_changed = filter_media(instance) + else: + # Downloaded media might need to be renamed + # Check settings before any rename tasks are scheduled + rename_sources_setting = getattr(settings, 'RENAME_SOURCES') or list() + create_rename_task = ( + ( + media.source.directory and + media.source.directory in rename_sources_setting + ) or + settings.RENAME_ALL_SOURCES + ) + if create_rename_task: + verbose_name = _('Renaming media for: {}: "{}"') + rename_media( + str(media.pk), + verbose_name=verbose_name.format(media.key, media.name), + ) + # If the media is missing metadata schedule it to be downloaded if not (instance.skip or instance.metadata or existing_media_metadata_task): log.info(f'Scheduling task to download metadata for: {instance.url}') verbose_name = _('Downloading metadata for "{}"') download_media_metadata( str(instance.pk), - priority=10, verbose_name=verbose_name.format(instance.pk), - remove_existing_tasks=True ) # If the media is missing a thumbnail schedule it to be downloaded (unless we are skipping this media) if not instance.thumb_file_exists: @@ -197,15 +256,10 @@ def media_post_save(sender, instance, created, **kwargs): download_media_thumbnail( str(instance.pk), thumbnail_url, - queue=str(instance.source.pk), - priority=15, verbose_name=verbose_name.format(instance.name), - remove_existing_tasks=True ) - existing_media_download_task = get_media_download_task(str(instance.pk)) # If the media has not yet been downloaded schedule it to be downloaded - downloaded = instance.downloaded - if not (instance.media_file_exists or existing_media_download_task): + if not (instance.media_file_exists or instance.filepath.exists() or existing_media_download_task): # The file was deleted after it was downloaded, skip this media. if instance.can_download and instance.downloaded: skip_changed = True != instance.skip @@ -216,10 +270,7 @@ def media_post_save(sender, instance, created, **kwargs): verbose_name = _('Downloading media for "{}"') download_media( str(instance.pk), - queue=str(instance.source.pk), - priority=15, verbose_name=verbose_name.format(instance.name), - remove_existing_tasks=True ) # Save the instance if any changes were required if skip_changed or can_download_changed: @@ -303,15 +354,3 @@ def media_post_delete(sender, instance, **kwargs): log.info(f'Deleting file for: {instance} path: {file}') delete_file(file) - # Schedule a task to update media servers - for mediaserver in MediaServer.objects.all(): - log.info(f'Scheduling media server updates') - verbose_name = _('Request media server rescan for "{}"') - rescan_media_server( - str(mediaserver.pk), - schedule=5, - priority=0, - verbose_name=verbose_name.format(mediaserver), - remove_existing_tasks=True - ) - diff --git a/tubesync/sync/tasks.py b/tubesync/sync/tasks.py index f1a40fb6..3b02e029 100644 --- a/tubesync/sync/tasks.py +++ b/tubesync/sync/tasks.py @@ -16,14 +16,17 @@ from PIL import Image from django.conf import settings from django.core.files.base import ContentFile from django.core.files.uploadedfile import SimpleUploadedFile +from django.db import DatabaseError, IntegrityError +from django.db.transaction import atomic from django.utils import timezone -from django.db.utils import IntegrityError from django.utils.translation import gettext_lazy as _ from background_task import background +from background_task.exceptions import InvalidTaskError from background_task.models import Task, CompletedTask from common.logger import log -from common.errors import NoMediaException, DownloadFailedException -from common.utils import json_serial +from common.errors import NoMediaException, NoMetadataException, DownloadFailedException +from common.utils import json_serial, remove_enclosed +from .choices import Val, TaskQueue from .models import Source, Media, MediaServer from .utils import (get_remote_image, resize_image_to_height, delete_file, write_text_file, filter_response) @@ -51,9 +54,11 @@ def map_task_to_instance(task): 'sync.tasks.download_media': Media, 'sync.tasks.download_media_metadata': Media, 'sync.tasks.save_all_media_for_source': Source, + 'sync.tasks.refesh_formats': Media, 'sync.tasks.rename_media': Media, 'sync.tasks.rename_all_media_for_source': Source, 'sync.tasks.wait_for_media_premiere': Media, + 'sync.tasks.delete_all_media_for_source': Source, } MODEL_URL_MAP = { Source: 'sync:source', @@ -104,41 +109,71 @@ def get_error_message(task): return error_message.split(':', 1)[1].strip() +def update_task_status(task, status): + if not task: + return False + if not task._verbose_name: + task._verbose_name = remove_enclosed( + task.verbose_name, '[', ']', ' ', + ) + if status is None: + task.verbose_name = task._verbose_name + else: + task.verbose_name = f'[{status}] {task._verbose_name}' + try: + with atomic(): + task.save(update_fields={'verbose_name'}) + except DatabaseError as e: + if 'Save with update_fields did not affect any rows.' == str(e): + pass + else: + raise + return True + + def get_source_completed_tasks(source_id, only_errors=False): ''' Returns a queryset of CompletedTask objects for a source by source ID. ''' - q = {'queue': source_id} + q = {'task_params__istartswith': f'[["{source_id}"'} if only_errors: q['failed_at__isnull'] = False return CompletedTask.objects.filter(**q).order_by('-failed_at') +def get_tasks(task_name, id=None, /, instance=None): + assert not (id is None and instance is None) + arg = str(id or instance.pk) + return Task.objects.get_task(str(task_name), args=(arg,),) + +def get_first_task(task_name, id=None, /, *, instance=None): + tqs = get_tasks(task_name, id, instance).order_by('run_at') + return tqs[0] if tqs.count() else False + def get_media_download_task(media_id): - try: - return Task.objects.get_task('sync.tasks.download_media', - args=(str(media_id),))[0] - except IndexError: - return False + return get_first_task('sync.tasks.download_media', media_id) def get_media_metadata_task(media_id): - try: - return Task.objects.get_task('sync.tasks.download_media_metadata', - args=(str(media_id),))[0] - except IndexError: - return False + return get_first_task('sync.tasks.download_media_metadata', media_id) def get_media_premiere_task(media_id): - try: - return Task.objects.get_task('sync.tasks.wait_for_media_premiere', - args=(str(media_id),))[0] - except IndexError: - return False + return get_first_task('sync.tasks.wait_for_media_premiere', media_id) + +def get_source_check_task(source_id): + return get_first_task('sync.tasks.save_all_media_for_source', source_id) + +def get_source_index_task(source_id): + return get_first_task('sync.tasks.index_source_task', source_id) + def delete_task_by_source(task_name, source_id): now = timezone.now() unlocked = Task.objects.unlocked(now) - return unlocked.filter(task_name=task_name, queue=str(source_id)).delete() + qs = unlocked.filter( + task_name=task_name, + task_params__istartswith=f'[["{source_id}"', + ) + return qs.delete() def delete_task_by_media(task_name, args): @@ -158,42 +193,79 @@ def cleanup_completed_tasks(): CompletedTask.objects.filter(run_at__lt=delta).delete() +@atomic(durable=False) +def migrate_queues(): + tqs = Task.objects.all() + qs = tqs.exclude(queue__in=TaskQueue.values) + return qs.update(queue=Val(TaskQueue.NET)) + + +def schedule_media_servers_update(): + with atomic(): + # Schedule a task to update media servers + log.info(f'Scheduling media server updates') + verbose_name = _('Request media server rescan for "{}"') + for mediaserver in MediaServer.objects.all(): + rescan_media_server( + str(mediaserver.pk), + priority=10, + verbose_name=verbose_name.format(mediaserver), + remove_existing_tasks=True, + ) + + def cleanup_old_media(): - for source in Source.objects.filter(delete_old_media=True, days_to_keep__gt=0): - delta = timezone.now() - timedelta(days=source.days_to_keep) - for media in source.media_source.filter(downloaded=True, download_date__lt=delta): - log.info(f'Deleting expired media: {source} / {media} ' - f'(now older than {source.days_to_keep} days / ' - f'download_date before {delta})') - # .delete() also triggers a pre_delete signal that removes the files - media.delete() + with atomic(): + for source in Source.objects.filter(delete_old_media=True, days_to_keep__gt=0): + delta = timezone.now() - timedelta(days=source.days_to_keep) + for media in source.media_source.filter(downloaded=True, download_date__lt=delta): + log.info(f'Deleting expired media: {source} / {media} ' + f'(now older than {source.days_to_keep} days / ' + f'download_date before {delta})') + with atomic(): + # .delete() also triggers a pre_delete/post_delete signals that remove files + media.delete() + schedule_media_servers_update() def cleanup_removed_media(source, videos): + if not source.delete_removed_media: + return + log.info(f'Cleaning up media no longer in source: {source}') media_objects = Media.objects.filter(source=source) for media in media_objects: matching_source_item = [video['id'] for video in videos if video['id'] == media.key] if not matching_source_item: log.info(f'{media.name} is no longer in source, removing') - media.delete() + with atomic(): + media.delete() + schedule_media_servers_update() -@background(schedule=0) +@background(schedule=dict(priority=10, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True) def index_source_task(source_id): ''' Indexes media available from a Source object. ''' + cleanup_completed_tasks() + # deleting expired media should happen any time an index task is requested + cleanup_old_media() try: source = Source.objects.get(pk=source_id) - except Source.DoesNotExist: + except Source.DoesNotExist as e: # Task triggered but the Source has been deleted, delete the task + raise InvalidTaskError(_('no such source')) from e + # An inactive Source would return an empty list for videos anyway + if not source.is_active: return # Reset any errors + # TODO: determine if this affects anything source.has_failed = False source.save() # Index the source videos = source.index_media() if not videos: + # TODO: Record this error in source.has_failed ? raise NoMediaException(f'Source "{source}" (ID: {source_id}) returned no ' f'media to index, is the source key valid? Check the ' f'source configuration is correct and that the source ' @@ -201,14 +273,24 @@ def index_source_task(source_id): # Got some media, update the last crawl timestamp source.last_crawl = timezone.now() source.save() - log.info(f'Found {len(videos)} media items for source: {source}') + num_videos = len(videos) + log.info(f'Found {num_videos} media items for source: {source}') fields = lambda f, m: m.get_metadata_field(f) - for video in videos: + task = get_source_index_task(source_id) + if task: + task._verbose_name = remove_enclosed( + task.verbose_name, '[', ']', ' ', + valid='0123456789/,', + end=task.verbose_name.find('Index'), + ) + tvn_format = '{:,}' + f'/{num_videos:,}' + for vn, video in enumerate(videos, start=1): # Create or update each video as a Media object key = video.get(source.key_field, None) if not key: # Video has no unique key (ID), it can't be indexed continue + update_task_status(task, tvn_format.format(vn)) try: media = Media.objects.get(key=key, source=source) except Media.DoesNotExist: @@ -222,7 +304,10 @@ def index_source_task(source_id): media.published = published_dt try: media.save() - log.debug(f'Indexed media: {source} / {media}') + except IntegrityError as e: + log.error(f'Index media failed: {source} / {media} with "{e}"') + else: + log.debug(f'Indexed media: {vn}: {source} / {media}') # log the new media instances new_media_instance = ( media.created and @@ -231,18 +316,20 @@ def index_source_task(source_id): ) if new_media_instance: log.info(f'Indexed new media: {source} / {media}') - except IntegrityError as e: - log.error(f'Index media failed: {source} / {media} with "{e}"') - # Tack on a cleanup of old completed tasks - cleanup_completed_tasks() - # Tack on a cleanup of old media - cleanup_old_media() - if source.delete_removed_media: - log.info(f'Cleaning up media no longer in source: {source}') - cleanup_removed_media(source, videos) + log.info(f'Scheduling task to download metadata for: {media.url}') + verbose_name = _('Downloading metadata for "{}"') + download_media_metadata( + str(media.pk), + priority=20, + verbose_name=verbose_name.format(media.pk), + ) + # Reset task.verbose_name to the saved value + update_task_status(task, None) + # Cleanup of media no longer available from the source + cleanup_removed_media(source, videos) -@background(schedule=0) +@background(schedule=dict(priority=0, run_at=0), queue=Val(TaskQueue.FS)) def check_source_directory_exists(source_id): ''' Checks the output directory for a source exists and is writable, if it does @@ -251,17 +338,17 @@ def check_source_directory_exists(source_id): ''' try: source = Source.objects.get(pk=source_id) - except Source.DoesNotExist: + except Source.DoesNotExist as e: # Task triggered but the Source has been deleted, delete the task - return + raise InvalidTaskError(_('no such source')) from e # Check the source output directory exists if not source.directory_exists(): - # Try and create it + # Try to create it log.info(f'Creating directory: {source.directory_path}') source.make_directory() -@background(schedule=0) +@background(schedule=dict(priority=5, run_at=10), queue=Val(TaskQueue.NET)) def download_source_images(source_id): ''' Downloads an image and save it as a local thumbnail attached to a @@ -269,11 +356,11 @@ def download_source_images(source_id): ''' try: source = Source.objects.get(pk=source_id) - except Source.DoesNotExist: + except Source.DoesNotExist as e: # Task triggered but the source no longer exists, do nothing log.error(f'Task download_source_images(pk={source_id}) called but no ' f'source exists with ID: {source_id}') - return + raise InvalidTaskError(_('no such source')) from e avatar, banner = source.get_image_url log.info(f'Thumbnail URL for source with ID: {source_id} / {source} ' f'Avatar: {avatar} ' @@ -311,18 +398,18 @@ def download_source_images(source_id): log.info(f'Thumbnail downloaded for source with ID: {source_id} / {source}') -@background(schedule=0) +@background(schedule=dict(priority=20, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True) def download_media_metadata(media_id): ''' Downloads the metadata for a media item. ''' try: media = Media.objects.get(pk=media_id) - except Media.DoesNotExist: + except Media.DoesNotExist as e: # Task triggered but the media no longer exists, do nothing log.error(f'Task download_media_metadata(pk={media_id}) called but no ' f'media exists with ID: {media_id}') - return + raise InvalidTaskError(_('no such media')) from e if media.manual_skip: log.info(f'Task for ID: {media_id} / {media} skipped, due to task being manually skipped.') return @@ -331,7 +418,7 @@ def download_media_metadata(media_id): metadata = media.index_metadata() except YouTubeError as e: e_str = str(e) - log_exception = True + raise_exception = True if ': Premieres in ' in e_str: now = timezone.now() published_datetime = None @@ -360,16 +447,13 @@ def download_media_metadata(media_id): verbose_name = _('Waiting for the premiere of "{}" at: {}') wait_for_media_premiere( str(media.pk), - priority=0, - queue=str(media.pk), repeat=Task.HOURLY, repeat_until = published_datetime + timedelta(hours=1), verbose_name=verbose_name.format(media.key, published_datetime.isoformat(' ', 'seconds')), - remove_existing_tasks=True, ) - log_exception = False - if log_exception: - log.exception(e) + raise_exception = False + if raise_exception: + raise log.debug(str(e)) return response = metadata @@ -398,7 +482,7 @@ def download_media_metadata(media_id): f'{source} / {media}: {media_id}') -@background(schedule=0) +@background(schedule=dict(priority=15, run_at=10), queue=Val(TaskQueue.NET), remove_existing_tasks=True) def download_media_thumbnail(media_id, url): ''' Downloads an image from a URL and save it as a local thumbnail attached to a @@ -406,10 +490,10 @@ def download_media_thumbnail(media_id, url): ''' try: media = Media.objects.get(pk=media_id) - except Media.DoesNotExist: + except Media.DoesNotExist as e: # Task triggered but the media no longer exists, do nothing - return - if media.skip: + raise InvalidTaskError(_('no such media')) from e + if media.skip or media.manual_skip: # Media was toggled to be skipped after the task was scheduled log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but ' f'it is now marked to be skipped, not downloading thumbnail') @@ -436,36 +520,43 @@ def download_media_thumbnail(media_id, url): return True -@background(schedule=0) +@background(schedule=dict(priority=15, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True) def download_media(media_id): ''' Downloads the media to disk and attaches it to the Media instance. ''' try: media = Media.objects.get(pk=media_id) - except Media.DoesNotExist: + except Media.DoesNotExist as e: # Task triggered but the media no longer exists, do nothing - return - if media.skip: - # Media was toggled to be skipped after the task was scheduled - log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but ' - f'it is now marked to be skipped, not downloading') - return - downloaded_file_exists = ( - media.media_file_exists or - media.filepath.exists() - ) - if media.downloaded and downloaded_file_exists: - # Media has been marked as downloaded before the download_media task was fired, - # skip it - log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but ' - f'it has already been marked as downloaded, not downloading again') - return + raise InvalidTaskError(_('no such media')) from e if not media.source.download_media: log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but ' f'the source {media.source} has since been marked to not download, ' f'not downloading') return + if media.skip or media.manual_skip: + # Media was toggled to be skipped after the task was scheduled + log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but ' + f'it is now marked to be skipped, not downloading') + return + # metadata is required to generate the proper filepath + if not media.has_metadata: + raise NoMetadataException('Metadata is not yet available.') + downloaded_file_exists = ( + media.downloaded and + media.has_metadata and + ( + media.media_file_exists or + media.filepath.exists() + ) + ) + if downloaded_file_exists: + # Media has been marked as downloaded before the download_media task was fired, + # skip it + log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but ' + f'it has already been marked as downloaded, not downloading again') + return max_cap_age = media.source.download_cap_date published = media.published if max_cap_age and published: @@ -538,43 +629,35 @@ def download_media(media_id): log.warn(f'A permissions problem occured when writing the new media NFO file: {e.msg}') pass # Schedule a task to update media servers - for mediaserver in MediaServer.objects.all(): - log.info(f'Scheduling media server updates') - verbose_name = _('Request media server rescan for "{}"') - rescan_media_server( - str(mediaserver.pk), - queue=str(media.source.pk), - priority=0, - verbose_name=verbose_name.format(mediaserver), - remove_existing_tasks=True - ) + schedule_media_servers_update() else: # Expected file doesn't exist on disk err = (f'Failed to download media: {media} (UUID: {media.pk}) to disk, ' f'expected outfile does not exist: {filepath}') log.error(err) # Try refreshing formats - media.refresh_formats + if media.has_metadata: + media.refresh_formats # Raising an error here triggers the task to be re-attempted (or fail) raise DownloadFailedException(err) -@background(schedule=0) +@background(schedule=dict(priority=0, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True) def rescan_media_server(mediaserver_id): ''' Attempts to request a media rescan on a remote media server. ''' try: mediaserver = MediaServer.objects.get(pk=mediaserver_id) - except MediaServer.DoesNotExist: + except MediaServer.DoesNotExist as e: # Task triggered but the media server no longer exists, do nothing - return + raise InvalidTaskError(_('no such server')) from e # Request an rescan / update log.info(f'Updating media server: {mediaserver}') mediaserver.update() -@background(schedule=0, remove_existing_tasks=True) +@background(schedule=dict(priority=25, run_at=600), queue=Val(TaskQueue.FS), remove_existing_tasks=True) def save_all_media_for_source(source_id): ''' Iterates all media items linked to a source and saves them to @@ -584,68 +667,116 @@ def save_all_media_for_source(source_id): ''' try: source = Source.objects.get(pk=source_id) - except Source.DoesNotExist: + except Source.DoesNotExist as e: # Task triggered but the source no longer exists, do nothing log.error(f'Task save_all_media_for_source(pk={source_id}) called but no ' f'source exists with ID: {source_id}') - return + raise InvalidTaskError(_('no such source')) from e - already_saved = set() + saved_later = set() mqs = Media.objects.filter(source=source) + task = get_source_check_task(source_id) refresh_qs = mqs.filter( can_download=False, skip=False, manual_skip=False, downloaded=False, + metadata__isnull=False, ) - for media in refresh_qs: - try: - media.refresh_formats - except YouTubeError as e: - log.debug(f'Failed to refresh formats for: {source} / {media.key}: {e!s}') - pass - else: - media.save() - already_saved.add(media.uuid) + if task: + task._verbose_name = remove_enclosed( + task.verbose_name, '[', ']', ' ', + valid='0123456789/,', + end=task.verbose_name.find('Check'), + ) + tvn_format = '1/{:,}' + f'/{refresh_qs.count():,}' + for mn, media in enumerate(refresh_qs, start=1): + update_task_status(task, tvn_format.format(mn)) + refesh_formats( + str(media.pk), + verbose_name=f'Refreshing metadata formats for: {media.key}: "{media.name}"', + ) + saved_later.add(media.uuid) # Trigger the post_save signal for each media item linked to this source as various # flags may need to be recalculated - for media in mqs: - if media.uuid not in already_saved: + tvn_format = '2/{:,}' + f'/{mqs.count():,}' + for mn, media in enumerate(mqs, start=1): + if media.uuid not in saved_later: + update_task_status(task, tvn_format.format(mn)) + with atomic(): + media.save() + # Reset task.verbose_name to the saved value + update_task_status(task, None) + + +@background(schedule=dict(priority=10, run_at=0), queue=Val(TaskQueue.NET), remove_existing_tasks=True) +def refesh_formats(media_id): + try: + media = Media.objects.get(pk=media_id) + except Media.DoesNotExist as e: + raise InvalidTaskError(_('no such media')) from e + try: + media.refresh_formats + except YouTubeError as e: + log.debug(f'Failed to refresh formats for: {media.source} / {media.key}: {e!s}') + pass + else: + with atomic(): media.save() -@background(schedule=0, remove_existing_tasks=True) +@background(schedule=dict(priority=20, run_at=60), queue=Val(TaskQueue.FS), remove_existing_tasks=True) def rename_media(media_id): try: media = Media.objects.defer('metadata', 'thumb').get(pk=media_id) - except Media.DoesNotExist: - return + except Media.DoesNotExist as e: + raise InvalidTaskError(_('no such media')) from e media.rename_files() -@background(schedule=0, remove_existing_tasks=True) +@background(schedule=dict(priority=20, run_at=300), queue=Val(TaskQueue.FS), remove_existing_tasks=True) +@atomic(durable=True) def rename_all_media_for_source(source_id): try: source = Source.objects.get(pk=source_id) - except Source.DoesNotExist: + except Source.DoesNotExist as e: # Task triggered but the source no longer exists, do nothing log.error(f'Task rename_all_media_for_source(pk={source_id}) called but no ' f'source exists with ID: {source_id}') + raise InvalidTaskError(_('no such source')) from e + # Check that the settings allow renaming + rename_sources_setting = getattr(settings, 'RENAME_SOURCES') or list() + create_rename_tasks = ( + ( + source.directory and + source.directory in rename_sources_setting + ) or + getattr(settings, 'RENAME_ALL_SOURCES', False) + ) + if not create_rename_tasks: return - for media in Media.objects.filter(source=source): - media.rename_files() + mqs = Media.objects.all().defer( + 'metadata', + 'thumb', + ).filter( + source=source, + downloaded=True, + ) + for media in mqs: + with atomic(): + media.rename_files() -@background(schedule=0, remove_existing_tasks=True) +@background(schedule=dict(priority=0, run_at=60), queue=Val(TaskQueue.DB), remove_existing_tasks=True) def wait_for_media_premiere(media_id): hours = lambda td: 1+int((24*td.days)+(td.seconds/(60*60))) try: media = Media.objects.get(pk=media_id) - except Media.DoesNotExist: - return - if media.metadata: + except Media.DoesNotExist as e: + raise InvalidTaskError(_('no such media')) from e + if media.has_metadata: return now = timezone.now() if media.published < now: @@ -657,4 +788,27 @@ def wait_for_media_premiere(media_id): media.manual_skip = True media.title = _(f'Premieres in {hours(media.published - now)} hours') media.save() + task = get_media_premiere_task(media_id) + if task: + update_task_status(task, f'available in {hours(media.published - now)} hours') + +@background(schedule=dict(priority=1, run_at=300), queue=Val(TaskQueue.FS), remove_existing_tasks=False) +def delete_all_media_for_source(source_id, source_name): + source = None + try: + source = Source.objects.get(pk=source_id) + except Source.DoesNotExist as e: + # Task triggered but the source no longer exists, do nothing + log.error(f'Task delete_all_media_for_source(pk={source_id}) called but no ' + f'source exists with ID: {source_id}') + raise InvalidTaskError(_('no such source')) from e + mqs = Media.objects.all().defer( + 'metadata', + ).filter( + source=source or source_id, + ) + for media in mqs: + log.info(f'Deleting media for source: {source_name} item: {media.name}') + with atomic(): + media.delete() diff --git a/tubesync/sync/templates/sync/dashboard.html b/tubesync/sync/templates/sync/dashboard.html index f25f36fc..23e1cdb2 100644 --- a/tubesync/sync/templates/sync/dashboard.html +++ b/tubesync/sync/templates/sync/dashboard.html @@ -24,9 +24,9 @@
@@ -35,9 +35,9 @@ @@ -46,9 +46,9 @@ @@ -99,6 +99,18 @@ +Running tasks are tasks which currently being worked on right now.
@@ -25,7 +25,7 @@ {% for task in running %} {{ task }}Tasks which generated an error are shown here. Tasks are retried a couple of times, so if there was an intermittent error such as a download got interrupted @@ -49,14 +49,16 @@ Task will be retried at {{ task.run_at|date:'Y-m-d H:i:s' }} {% empty %} - There are no tasks with errors. + There are no tasks with errors on this page. {% endfor %}
Tasks which are scheduled to run in the future or are waiting in a queue to be processed. They can be waiting for an available worker to run immediately, or @@ -70,11 +72,12 @@ Task will run {% if task.run_now %}immediately{% else %}at {{ task.run_at|date:'Y-m-d H:i:s' }}{% endif %} {% empty %} - There are no scheduled tasks. + There are no scheduled tasks on this page. {% endfor %}