[dash,youtube] Download live from start to end (#888)

* Add option `--live-from-start` to enable downloading live videos from start
* Add key `is_from_start` in formats to identify formats (of live videos) that downloads from start
* [dash] Create protocol `http_dash_segments_generator` that allows a function to be passed instead of fragments
* [fragment] Allow multiple live dash formats to download simultaneously
* [youtube] Implement fragment re-fetching for the live dash formats
* [youtube] Re-extract dash manifest every 5 hours (manifest expires in 6hrs)
* [postprocessor/ffmpeg] Add `FFmpegFixupDuplicateMoovPP` to fixup duplicated moov atoms

Known issue: Ctrl+C doesn't work on Windows when downloading multiple formats

Closes #1521
Authored by: nao20010128nao, pukkandan
This commit is contained in:
The Hatsune Daishi
2021-12-20 15:06:46 +09:00
committed by GitHub
parent c031b0414c
commit adbc4ec4bb
15 changed files with 355 additions and 100 deletions

View File

@@ -12,10 +12,15 @@ def get_suitable_downloader(info_dict, params={}, default=NO_DEFAULT, protocol=N
info_copy = info_dict.copy()
info_copy['to_stdout'] = to_stdout
downloaders = [_get_suitable_downloader(info_copy, proto, params, default)
for proto in (protocol or info_copy['protocol']).split('+')]
protocols = (protocol or info_copy['protocol']).split('+')
downloaders = [_get_suitable_downloader(info_copy, proto, params, default) for proto in protocols]
if set(downloaders) == {FFmpegFD} and FFmpegFD.can_merge_formats(info_copy, params):
return FFmpegFD
elif (set(downloaders) == {DashSegmentsFD}
and not (to_stdout and len(protocols) > 1)
and set(protocols) == {'http_dash_segments_generator'}):
return DashSegmentsFD
elif len(downloaders) == 1:
return downloaders[0]
return None
@@ -49,6 +54,7 @@ PROTOCOL_MAP = {
'rtsp': RtspFD,
'f4m': F4mFD,
'http_dash_segments': DashSegmentsFD,
'http_dash_segments_generator': DashSegmentsFD,
'ism': IsmFD,
'mhtml': MhtmlFD,
'niconico_dmc': NiconicoDmcFD,
@@ -63,6 +69,7 @@ def shorten_protocol_name(proto, simplify=False):
'm3u8_native': 'm3u8_n',
'rtmp_ffmpeg': 'rtmp_f',
'http_dash_segments': 'dash',
'http_dash_segments_generator': 'dash_g',
'niconico_dmc': 'dmc',
'websocket_frag': 'WSfrag',
}
@@ -71,6 +78,7 @@ def shorten_protocol_name(proto, simplify=False):
'https': 'http',
'ftps': 'ftp',
'm3u8_native': 'm3u8',
'http_dash_segments_generator': 'dash',
'rtmp_ffmpeg': 'rtmp',
'm3u8_frag_urls': 'm3u8',
'dash_frag_urls': 'dash',

View File

@@ -1,4 +1,5 @@
from __future__ import unicode_literals
import time
from ..downloader import get_suitable_downloader
from .fragment import FragmentFD
@@ -15,27 +16,53 @@ class DashSegmentsFD(FragmentFD):
FD_NAME = 'dashsegments'
def real_download(self, filename, info_dict):
if info_dict.get('is_live'):
if info_dict.get('is_live') and set(info_dict['protocol'].split('+')) != {'http_dash_segments_generator'}:
self.report_error('Live DASH videos are not supported')
fragment_base_url = info_dict.get('fragment_base_url')
fragments = info_dict['fragments'][:1] if self.params.get(
'test', False) else info_dict['fragments']
real_start = time.time()
real_downloader = get_suitable_downloader(
info_dict, self.params, None, protocol='dash_frag_urls', to_stdout=(filename == '-'))
ctx = {
'filename': filename,
'total_frags': len(fragments),
}
requested_formats = [{**info_dict, **fmt} for fmt in info_dict.get('requested_formats', [])]
args = []
for fmt in requested_formats or [info_dict]:
try:
fragment_count = 1 if self.params.get('test') else len(fmt['fragments'])
except TypeError:
fragment_count = None
ctx = {
'filename': fmt.get('filepath') or filename,
'live': 'is_from_start' if fmt.get('is_from_start') else fmt.get('is_live'),
'total_frags': fragment_count,
}
if real_downloader:
self._prepare_external_frag_download(ctx)
else:
self._prepare_and_start_frag_download(ctx, info_dict)
if real_downloader:
self._prepare_external_frag_download(ctx)
else:
self._prepare_and_start_frag_download(ctx, fmt)
ctx['start'] = real_start
fragments_to_download = self._get_fragments(fmt, ctx)
if real_downloader:
self.to_screen(
'[%s] Fragment downloads will be delegated to %s' % (self.FD_NAME, real_downloader.get_basename()))
info_dict['fragments'] = fragments_to_download
fd = real_downloader(self.ydl, self.params)
return fd.real_download(filename, info_dict)
args.append([ctx, fragments_to_download, fmt])
return self.download_and_append_fragments_multiple(*args)
def _resolve_fragments(self, fragments, ctx):
fragments = fragments(ctx) if callable(fragments) else fragments
return [next(fragments)] if self.params.get('test') else fragments
def _get_fragments(self, fmt, ctx):
fragment_base_url = fmt.get('fragment_base_url')
fragments = self._resolve_fragments(fmt['fragments'], ctx)
fragments_to_download = []
frag_index = 0
for i, fragment in enumerate(fragments):
frag_index += 1
@@ -46,17 +73,8 @@ class DashSegmentsFD(FragmentFD):
assert fragment_base_url
fragment_url = urljoin(fragment_base_url, fragment['path'])
fragments_to_download.append({
yield {
'frag_index': frag_index,
'index': i,
'url': fragment_url,
})
if real_downloader:
self.to_screen(
'[%s] Fragment downloads will be delegated to %s' % (self.FD_NAME, real_downloader.get_basename()))
info_dict['fragments'] = fragments_to_download
fd = real_downloader(self.ydl, self.params)
return fd.real_download(filename, info_dict)
return self.download_and_append_fragments(ctx, fragments_to_download, info_dict)
}

View File

@@ -366,7 +366,7 @@ class F4mFD(FragmentFD):
ctx = {
'filename': filename,
'total_frags': total_frags,
'live': live,
'live': bool(live),
}
self._prepare_frag_download(ctx)

View File

@@ -1,9 +1,10 @@
from __future__ import division, unicode_literals
import http.client
import json
import math
import os
import time
import json
from math import ceil
try:
import concurrent.futures
@@ -15,6 +16,7 @@ from .common import FileDownloader
from .http import HttpFD
from ..aes import aes_cbc_decrypt_bytes
from ..compat import (
compat_os_name,
compat_urllib_error,
compat_struct_pack,
)
@@ -90,7 +92,7 @@ class FragmentFD(FileDownloader):
self._start_frag_download(ctx, info_dict)
def __do_ytdl_file(self, ctx):
return not ctx['live'] and not ctx['tmpfilename'] == '-' and not self.params.get('_no_ytdl_file')
return ctx['live'] is not True and ctx['tmpfilename'] != '-' and not self.params.get('_no_ytdl_file')
def _read_ytdl_file(self, ctx):
assert 'ytdl_corrupt' not in ctx
@@ -375,17 +377,20 @@ class FragmentFD(FileDownloader):
@params (ctx1, fragments1, info_dict1), (ctx2, fragments2, info_dict2), ...
all args must be either tuple or list
'''
interrupt_trigger = [True]
max_progress = len(args)
if max_progress == 1:
return self.download_and_append_fragments(*args[0], pack_func=pack_func, finish_func=finish_func)
max_workers = self.params.get('concurrent_fragment_downloads', max_progress)
max_workers = self.params.get('concurrent_fragment_downloads', 1)
if max_progress > 1:
self._prepare_multiline_status(max_progress)
def thread_func(idx, ctx, fragments, info_dict, tpe):
ctx['max_progress'] = max_progress
ctx['progress_idx'] = idx
return self.download_and_append_fragments(ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func, tpe=tpe)
return self.download_and_append_fragments(
ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func,
tpe=tpe, interrupt_trigger=interrupt_trigger)
class FTPE(concurrent.futures.ThreadPoolExecutor):
# has to stop this or it's going to wait on the worker thread itself
@@ -393,8 +398,11 @@ class FragmentFD(FileDownloader):
pass
spins = []
if compat_os_name == 'nt':
self.report_warning('Ctrl+C does not work on Windows when used with parallel threads. '
'This is a known issue and patches are welcome')
for idx, (ctx, fragments, info_dict) in enumerate(args):
tpe = FTPE(ceil(max_workers / max_progress))
tpe = FTPE(math.ceil(max_workers / max_progress))
job = tpe.submit(thread_func, idx, ctx, fragments, info_dict, tpe)
spins.append((tpe, job))
@@ -402,18 +410,32 @@ class FragmentFD(FileDownloader):
for tpe, job in spins:
try:
result = result and job.result()
except KeyboardInterrupt:
interrupt_trigger[0] = False
finally:
tpe.shutdown(wait=True)
if not interrupt_trigger[0]:
raise KeyboardInterrupt()
return result
def download_and_append_fragments(self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None, tpe=None):
def download_and_append_fragments(
self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None,
tpe=None, interrupt_trigger=None):
if not interrupt_trigger:
interrupt_trigger = (True, )
fragment_retries = self.params.get('fragment_retries', 0)
is_fatal = (lambda idx: idx == 0) if self.params.get('skip_unavailable_fragments', True) else (lambda _: True)
is_fatal = (
((lambda _: False) if info_dict.get('is_live') else (lambda idx: idx == 0))
if self.params.get('skip_unavailable_fragments', True) else (lambda _: True))
if not pack_func:
pack_func = lambda frag_content, _: frag_content
def download_fragment(fragment, ctx):
frag_index = ctx['fragment_index'] = fragment['frag_index']
if not interrupt_trigger[0]:
return False, frag_index
headers = info_dict.get('http_headers', {}).copy()
byte_range = fragment.get('byte_range')
if byte_range:
@@ -428,7 +450,7 @@ class FragmentFD(FileDownloader):
if not success:
return False, frag_index
break
except compat_urllib_error.HTTPError as err:
except (compat_urllib_error.HTTPError, http.client.IncompleteRead) as err:
# Unavailable (possibly temporary) fragments may be served.
# First we try to retry then either skip or abort.
# See https://github.com/ytdl-org/youtube-dl/issues/10165,
@@ -466,7 +488,8 @@ class FragmentFD(FileDownloader):
decrypt_fragment = self.decrypter(info_dict)
max_workers = self.params.get('concurrent_fragment_downloads', 1)
max_workers = math.ceil(
self.params.get('concurrent_fragment_downloads', 1) / ctx.get('max_progress', 1))
if can_threaded_download and max_workers > 1:
def _download_fragment(fragment):
@@ -477,6 +500,8 @@ class FragmentFD(FileDownloader):
self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome')
with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
for fragment, frag_content, frag_index, frag_filename in pool.map(_download_fragment, fragments):
if not interrupt_trigger[0]:
break
ctx['fragment_filename_sanitized'] = frag_filename
ctx['fragment_index'] = frag_index
result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx)
@@ -484,6 +509,8 @@ class FragmentFD(FileDownloader):
return False
else:
for fragment in fragments:
if not interrupt_trigger[0]:
break
frag_content, frag_index = download_fragment(fragment, ctx)
result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx)
if not result: