[websockets] Add WebSocketFragmentFD (#399)

Necessary for #392

Co-authored by: nao20010128nao, pukkandan
This commit is contained in:
pukkandan
2021-06-21 22:53:17 +05:30
parent ff0f78e1fe
commit e36d50c5dd
14 changed files with 140 additions and 18 deletions

View File

@@ -24,6 +24,7 @@ from .rtsp import RtspFD
from .ism import IsmFD
from .mhtml import MhtmlFD
from .niconico import NiconicoDmcFD
from .websocket import WebSocketFragmentFD
from .youtube_live_chat import YoutubeLiveChatReplayFD
from .external import (
get_external_downloader,
@@ -42,6 +43,7 @@ PROTOCOL_MAP = {
'ism': IsmFD,
'mhtml': MhtmlFD,
'niconico_dmc': NiconicoDmcFD,
'websocket_frag': WebSocketFragmentFD,
'youtube_live_chat_replay': YoutubeLiveChatReplayFD,
}
@@ -52,6 +54,7 @@ def shorten_protocol_name(proto, simplify=False):
'rtmp_ffmpeg': 'rtmp_f',
'http_dash_segments': 'dash',
'niconico_dmc': 'dmc',
'websocket_frag': 'WSfrag',
}
if simplify:
short_protocol_names.update({

View File

@@ -347,6 +347,10 @@ class FFmpegFD(ExternalFD):
# TODO: Fix path for ffmpeg
return FFmpegPostProcessor().available
def on_process_started(self, proc, stdin):
""" Override this in subclasses """
pass
def _call_downloader(self, tmpfilename, info_dict):
urls = [f['url'] for f in info_dict.get('requested_formats', [])] or [info_dict['url']]
ffpp = FFmpegPostProcessor(downloader=self)
@@ -474,6 +478,8 @@ class FFmpegFD(ExternalFD):
self._debug_cmd(args)
proc = subprocess.Popen(args, stdin=subprocess.PIPE, env=env)
if url in ('-', 'pipe:'):
self.on_process_started(proc, proc.stdin)
try:
retval = proc.wait()
except BaseException as e:
@@ -482,7 +488,7 @@ class FFmpegFD(ExternalFD):
# produces a file that is playable (this is mostly useful for live
# streams). Note that Windows is not affected and produces playable
# files (see https://github.com/ytdl-org/youtube-dl/issues/8300).
if isinstance(e, KeyboardInterrupt) and sys.platform != 'win32':
if isinstance(e, KeyboardInterrupt) and sys.platform != 'win32' and url not in ('-', 'pipe:'):
process_communicate_or_kill(proc, b'q')
else:
proc.kill()

View File

@@ -0,0 +1,59 @@
import os
import signal
import asyncio
import threading
try:
import websockets
has_websockets = True
except ImportError:
has_websockets = False
from .common import FileDownloader
from .external import FFmpegFD
class FFmpegSinkFD(FileDownloader):
""" A sink to ffmpeg for downloading fragments in any form """
def real_download(self, filename, info_dict):
info_copy = info_dict.copy()
info_copy['url'] = '-'
async def call_conn(proc, stdin):
try:
await self.real_connection(stdin, info_dict)
except (BrokenPipeError, OSError):
pass
finally:
try:
stdin.flush()
stdin.close()
except OSError:
pass
os.kill(os.getpid(), signal.SIGINT)
class FFmpegStdinFD(FFmpegFD):
@classmethod
def get_basename(cls):
return FFmpegFD.get_basename()
def on_process_started(self, proc, stdin):
thread = threading.Thread(target=asyncio.run, daemon=True, args=(call_conn(proc, stdin), ))
thread.start()
return FFmpegStdinFD(self.ydl, self.params or {}).download(filename, info_copy)
async def real_connection(self, sink, info_dict):
""" Override this in subclasses """
raise NotImplementedError('This method must be implemented by subclasses')
class WebSocketFragmentFD(FFmpegSinkFD):
async def real_connection(self, sink, info_dict):
async with websockets.connect(info_dict['url'], extra_headers=info_dict.get('http_headers', {})) as ws:
while True:
recv = await ws.recv()
if isinstance(recv, str):
recv = recv.encode('utf8')
sink.write(recv)