tubesync/tubesync/sync/youtube.py
meeb cb36f44235
Some checks failed
CI / info (push) Has been cancelled
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.8) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / containerise (push) Has been cancelled
Merge pull request #1017 from tcely/patch-3
Let `missing_pot` formats download after testing
2025-05-10 15:46:12 +10:00

444 lines
16 KiB
Python

'''
Wrapper for the yt-dlp library. Used so if there are any library interface
updates we only need to udpate them in one place.
'''
import os
from collections import namedtuple
from common.logger import log
from copy import deepcopy
from pathlib import Path
from tempfile import TemporaryDirectory
from urllib.parse import urlsplit, parse_qs
from django.conf import settings
from .choices import Val, FileExtension
from .hooks import postprocessor_hook, progress_hook
from .utils import mkdir_p
import yt_dlp
import yt_dlp.patch.check_thumbnails
import yt_dlp.patch.fatal_http_errors
from yt_dlp.utils import remove_end, shell_quote, OUTTMPL_TYPES
_defaults = getattr(settings, 'YOUTUBE_DEFAULTS', {})
_youtubedl_cachedir = getattr(settings, 'YOUTUBE_DL_CACHEDIR', None)
if _youtubedl_cachedir:
_youtubedl_cachedir = str(_youtubedl_cachedir)
_defaults['cachedir'] = _youtubedl_cachedir
_youtubedl_tempdir = getattr(settings, 'YOUTUBE_DL_TEMPDIR', None)
if _youtubedl_tempdir:
_youtubedl_tempdir = str(_youtubedl_tempdir)
_youtubedl_tempdir_path = Path(_youtubedl_tempdir)
mkdir_p(_youtubedl_tempdir_path)
(_youtubedl_tempdir_path / '.ignore').touch(exist_ok=True)
_paths = _defaults.get('paths', {})
_paths.update({ 'temp': _youtubedl_tempdir, })
_defaults['paths'] = _paths
class YouTubeError(yt_dlp.utils.DownloadError):
'''
Generic wrapped error for all errors that could be raised by youtube-dl.
'''
pass
def get_yt_opts():
opts = deepcopy(_defaults)
cookie_file = settings.COOKIES_FILE
if cookie_file.is_file():
cookie_file_path = str(cookie_file.resolve())
log.info(f'[youtube-dl] using cookies.txt from: {cookie_file_path}')
opts.update({'cookiefile': cookie_file_path})
return opts
def get_channel_id(url):
# yt-dlp --simulate --no-check-formats --playlist-items 1
# --print 'pre_process:%(playlist_channel_id,playlist_id,channel_id)s'
opts = get_yt_opts()
opts.update({
'skip_download': True,
'simulate': True,
'logger': log,
'extract_flat': True, # Change to False to get detailed info
'check_formats': False,
'playlist_items': '1',
})
with yt_dlp.YoutubeDL(opts) as y:
try:
response = y.extract_info(url, download=False)
except yt_dlp.utils.DownloadError as e:
raise YouTubeError(f'Failed to extract channel ID for "{url}": {e}') from e
else:
try:
channel_id = response['channel_id']
except Exception as e:
raise YouTubeError(f'Failed to extract channel ID for "{url}": {e}') from e
else:
return channel_id
def get_channel_image_info(url):
opts = get_yt_opts()
opts.update({
'skip_download': True,
'simulate': True,
'logger': log,
'extract_flat': True, # Change to False to get detailed info
'check_formats': False,
})
with yt_dlp.YoutubeDL(opts) as y:
try:
response = y.extract_info(url, download=False)
avatar_url = None
banner_url = None
for thumbnail in response['thumbnails']:
if thumbnail['id'] == 'avatar_uncropped':
avatar_url = thumbnail['url']
if thumbnail['id'] == 'banner_uncropped':
banner_url = thumbnail['url']
if banner_url != None and avatar_url != None:
break
return avatar_url, banner_url
except yt_dlp.utils.DownloadError as e:
raise YouTubeError(f'Failed to extract channel info for "{url}": {e}') from e
def _subscriber_only(msg='', response=None):
if response is None:
# process msg only
msg = str(msg)
if 'access to members-only content' in msg:
return True
if ': Join this channel' in msg:
return True
if 'Join this YouTube channel' in msg:
return True
else:
# ignore msg entirely
if not isinstance(response, dict):
raise TypeError(f'response must be a dict, got "{type(response)}" instead')
if 'availability' not in response.keys():
return False
# check for the specific expected value
return 'subscriber_only' == response.get('availability')
return False
def get_media_info(url, /, *, days=None, info_json=None):
'''
Extracts information from a YouTube URL and returns it as a dict. For a channel
or playlist this returns a dict of all the videos on the channel or playlist
as well as associated metadata.
'''
start = None
if days is not None:
try:
days = int(str(days), 10)
except Exception as e:
days = None
start = (
f'yesterday-{days!s}days' if days else None
)
opts = get_yt_opts()
default_opts = yt_dlp.parse_options([]).options
class NoDefaultValue: pass # a unique Singleton, that may be checked for later
user_set = lambda k, d, default=NoDefaultValue: d[k] if k in d.keys() else default
default_paths = user_set('paths', default_opts.__dict__, dict())
paths = user_set('paths', opts, default_paths)
if 'temp' in paths:
temp_dir_obj = TemporaryDirectory(prefix='.yt_dlp-', dir=paths['temp'])
temp_dir_path = Path(temp_dir_obj.name)
(temp_dir_path / '.ignore').touch(exist_ok=True)
paths.update({
'temp': str(temp_dir_path),
})
try:
info_json_path = Path(info_json).resolve(strict=False)
except (RuntimeError, TypeError):
pass
else:
paths.update({
'infojson': user_set('infojson', paths, str(info_json_path))
})
default_postprocessors = user_set('postprocessors', default_opts.__dict__, list())
postprocessors = user_set('postprocessors', opts, default_postprocessors)
postprocessors.append(dict(
key='Exec',
when='playlist',
exec_cmd="/usr/bin/env bash /app/full_playlist.sh '%(id)s' '%(playlist_count)d'",
))
cache_directory_path = Path(user_set('cachedir', opts, '/dev/shm'))
playlist_infojson = 'postprocessor_[%(id)s]_%(n_entries)d_%(playlist_count)d_temp'
outtmpl = dict(
default='',
infojson='%(extractor)s/%(id)s.%(ext)s' if paths.get('infojson') else '',
pl_infojson=f'{cache_directory_path}/infojson/playlist/{playlist_infojson}.%(ext)s',
)
for k in OUTTMPL_TYPES.keys():
outtmpl.setdefault(k, '')
opts.update({
'ignoreerrors': False, # explicitly set this to catch exceptions
'ignore_no_formats_error': False, # we must fail first to try again with this enabled
'skip_download': True,
'simulate': False,
'logger': log,
'extract_flat': True,
'allow_playlist_files': True,
'check_formats': True,
'check_thumbnails': False,
'clean_infojson': False,
'daterange': yt_dlp.utils.DateRange(start=start),
'extractor_args': {
'youtube': {'formats': ['missing_pot']},
'youtubetab': {'approximate_date': ['true']},
},
'outtmpl': outtmpl,
'overwrites': True,
'paths': paths,
'postprocessors': postprocessors,
'skip_unavailable_fragments': False,
'sleep_interval_requests': 1,
'verbose': True if settings.DEBUG else False,
'writeinfojson': True,
})
if settings.BACKGROUND_TASK_RUN_ASYNC:
opts.update({
'sleep_interval_requests': 2 * settings.BACKGROUND_TASK_ASYNC_THREADS,
})
if start:
log.debug(f'get_media_info: used date range: {opts["daterange"]} for URL: {url}')
response = {}
with yt_dlp.YoutubeDL(opts) as y:
try:
response = y.extract_info(url, download=False)
except yt_dlp.utils.DownloadError as e:
if not _subscriber_only(msg=e.msg):
raise YouTubeError(f'Failed to extract_info for "{url}": {e}') from e
# adjust options and try again
opts.update({'ignore_no_formats_error': True,})
with yt_dlp.YoutubeDL(opts) as yy:
try:
response = yy.extract_info(url, download=False)
except yt_dlp.utils.DownloadError as ee:
raise YouTubeError(f'Failed (again) to extract_info for "{url}": {ee}') from ee
# validate the response is what we expected
if not _subscriber_only(response=response):
response = {}
if not response:
raise YouTubeError(f'Failed to extract_info for "{url}": No metadata was '
f'returned by youtube-dl, check for error messages in the '
f'logs above. This task will be retried later with an '
f'exponential backoff.')
return response
# Yes, this looks odd. But, it works.
# It works without also causing indentation problems.
# I'll take ease of editing, thanks.
def download_media(
url, media_format, extension, output_file,
info_json, sponsor_categories=None,
embed_thumbnail=False, embed_metadata=False,
skip_sponsors=True, write_subtitles=False,
auto_subtitles=False, sub_langs='en'
):
'''
Downloads a YouTube URL to a file on disk.
'''
opts = get_yt_opts()
default_opts = yt_dlp.parse_options([]).options
pp_opts = deepcopy(default_opts)
# We fake up this option to make it easier for the user to add post processors.
postprocessors = opts.get('add_postprocessors', pp_opts.add_postprocessors)
if isinstance(postprocessors, str):
# NAME1[:ARGS], NAME2[:ARGS]
# ARGS are a semicolon ";" delimited list of NAME=VALUE
#
# This means that "," cannot be present in NAME or VALUE.
# If you need to support that, then use the 'postprocessors' key,
# in your settings dictionary instead.
_postprocessor_opts_parser = lambda key, val='': (
*(
item.split('=', 1) for item in (val.split(';') if val else [])
),
( 'key', remove_end(key, 'PP'), )
)
postprocessors = list(
dict(
_postprocessor_opts_parser( *val.split(':', 1) )
) for val in map(str.strip, postprocessors.split(','))
)
if not isinstance(postprocessors, list):
postprocessors = list()
# Add any post processors configured the 'hard' way also.
postprocessors.extend( opts.get('postprocessors', list()) )
pp_opts.__dict__.update({
'add_postprocessors': postprocessors,
'embedthumbnail': embed_thumbnail,
'addmetadata': embed_metadata,
'addchapters': True,
'embed_infojson': False,
'writethumbnail': False,
'force_keyframes_at_cuts': True,
'sponskrub': False,
})
pp_opts.exec_cmd.update(
opts.get('exec_cmd', default_opts.exec_cmd)
)
if skip_sponsors:
# Let yt_dlp convert from human for us.
pp_opts.sponsorblock_mark = yt_dlp.parse_options(
['--sponsorblock-mark=all,-chapter']
).options.sponsorblock_mark
pp_opts.sponsorblock_remove.update(sponsor_categories or {})
# Enable audio extraction for audio-only extensions
audio_exts = set(Val(
FileExtension.M4A,
FileExtension.OGG,
))
if extension in audio_exts:
pp_opts.extractaudio = True
pp_opts.nopostoverwrites = False
# The ExtractAudio post processor can change the extension.
# This post processor is to change the final filename back
# to what we are expecting it to be.
final_path = Path(output_file)
try:
final_path = final_path.resolve(strict=True)
except FileNotFoundError:
# This is very likely the common case
final_path = Path(output_file).resolve(strict=False)
expected_file = shell_quote(str(final_path))
cmds = pp_opts.exec_cmd.get('after_move', list())
# It is important that we use a tuple for strings.
# Otherwise, list adds each character instead.
# That last comma is really necessary!
cmds += (
f'test -f {expected_file} || '
'mv -T -u -- %(filepath,_filename|)q '
f'{expected_file}',
)
# assignment is the quickest way to cover both 'get' cases
pp_opts.exec_cmd['after_move'] = cmds
elif '+' not in media_format:
pp_opts.remuxvideo = extension
ytopts = {
'format': media_format,
'final_ext': extension,
'merge_output_format': extension,
'outtmpl': os.path.basename(output_file),
'remuxvideo': pp_opts.remuxvideo,
'quiet': False if settings.DEBUG else True,
'verbose': True if settings.DEBUG else False,
'noprogress': None if settings.DEBUG else True,
'writeinfojson': info_json,
'writesubtitles': write_subtitles,
'writeautomaticsub': auto_subtitles,
'subtitleslangs': sub_langs.split(','),
'writethumbnail': embed_thumbnail,
'check_formats': None,
'overwrites': None,
'skip_unavailable_fragments': False,
'sleep_interval': 10,
'max_sleep_interval': min(20*60, max(60, settings.DOWNLOAD_MEDIA_DELAY)),
'sleep_interval_requests': 1 + (2 * settings.BACKGROUND_TASK_ASYNC_THREADS),
'extractor_args': opts.get('extractor_args', dict()),
'paths': opts.get('paths', dict()),
'postprocessor_args': opts.get('postprocessor_args', dict()),
'postprocessor_hooks': opts.get('postprocessor_hooks', list()),
'progress_hooks': opts.get('progress_hooks', list()),
}
output_dir = os.path.dirname(output_file)
temp_dir_parent = output_dir
temp_dir_prefix = '.yt_dlp-'
if 'temp' in ytopts['paths']:
v_key = parse_qs(urlsplit(url).query).get('v').pop()
temp_dir_parent = ytopts['paths']['temp']
temp_dir_prefix = f'{temp_dir_prefix}{v_key}-'
temp_dir_obj = TemporaryDirectory(prefix=temp_dir_prefix,dir=temp_dir_parent)
if temp_dir_obj and (Path(temp_dir_parent) / '.clean').exists():
temp_dir_path = Path(temp_dir_obj.name)
else:
temp_dir_path = Path(temp_dir_parent)
(temp_dir_path / '.ignore').touch(exist_ok=True)
ytopts['paths'].update({
'home': str(output_dir),
'temp': str(temp_dir_path),
})
# Allow download of formats that tested good with 'missing_pot'
youtube_ea_dict = ytopts['extractor_args'].get('youtube', dict())
formats_list = youtube_ea_dict.get('formats', list())
if 'missing_pot' not in formats_list:
formats_list += ('missing_pot',)
youtube_ea_dict.update({
'formats': formats_list,
})
ytopts['extractor_args'].update({
'youtube': youtube_ea_dict,
})
postprocessor_hook_func = postprocessor_hook.get('function', None)
if postprocessor_hook_func:
ytopts['postprocessor_hooks'].append(postprocessor_hook_func)
progress_hook_func = progress_hook.get('function', None)
if progress_hook_func:
ytopts['progress_hooks'].append(progress_hook_func)
codec_options = list()
ofn = ytopts['outtmpl']
if 'av1-' in ofn:
codec_options.extend(['-c:v', 'libsvtav1', '-preset', '8', '-crf', '35'])
elif 'vp9-' in ofn:
codec_options.extend(['-c:v', 'libvpx-vp9', '-b:v', '0', '-crf', '31', '-row-mt', '1', '-tile-columns', '2'])
if '-opus' in ofn:
codec_options.extend(['-c:a', 'libopus'])
set_ffmpeg_codec = not (
ytopts['postprocessor_args'] and
ytopts['postprocessor_args']['modifychapters+ffmpeg']
)
if set_ffmpeg_codec and codec_options:
ytopts['postprocessor_args'].update({
'modifychapters+ffmpeg': codec_options,
})
# Provide the user control of 'overwrites' in the post processors.
pp_opts.overwrites = opts.get(
'overwrites',
ytopts.get(
'overwrites',
default_opts.overwrites,
),
)
# Create the post processors list.
# It already included user configured post processors as well.
ytopts['postprocessors'] = list(yt_dlp.get_postprocessors(pp_opts))
opts.update(ytopts)
with yt_dlp.YoutubeDL(opts) as y:
try:
return y.download([url])
except yt_dlp.utils.DownloadError as e:
raise YouTubeError(f'Failed to download for "{url}": {e}') from e
return False