diff --git a/telethon/utils.py b/telethon/utils.py index f74ef880..86a8661e 100644 --- a/telethon/utils.py +++ b/telethon/utils.py @@ -601,16 +601,52 @@ def get_message_id(message): def _get_metadata(file): - # `hachoir` only deals with paths to in-disk files, while - # `_get_extension` supports a few other things. The parser - # may also fail in any case and we don't want to crash if + if not hachoir: + return + + stream = None + close_stream = True + seekable = True + + # The parser may fail and we don't want to crash if # the extraction process fails. - if hachoir and isinstance(file, str) and os.path.isfile(file): - try: - with hachoir.parser.createParser(file) as parser: - return hachoir.metadata.extractMetadata(parser) - except Exception as e: - _log.warning('Failed to analyze %s: %s %s', file, e.__class__, e) + try: + # Note: aiofiles are intentionally left out for simplicity + if isinstance(file, str): + stream = open(file, 'rb') + elif isinstance(file, bytes): + stream = io.BytesIO(file) + else: + stream = file + close_stream = False + if getattr(file, 'seekable', None): + seekable = file.seekable() + else: + seekable = False + + if not seekable: + return None + + pos = stream.tell() + filename = getattr(file, 'name', '') + + parser = hachoir.parser.guess.guessParser(hachoir.stream.InputIOStream( + stream, + source='file:' + filename, + tags=[], + filename=filename + )) + + return hachoir.metadata.extractMetadata(parser) + + except Exception as e: + _log.warning('Failed to analyze %s: %s %s', file, e.__class__, e) + + finally: + if stream and close_stream: + stream.close() + elif stream and seekable: + stream.seek(pos) def get_attributes(file, *, attributes=None, mime_type=None, @@ -803,15 +839,31 @@ def is_gif(file): def is_audio(file): - """Returns `True` if the file extension looks like an audio file.""" - file = 'a' + _get_extension(file) - return (mimetypes.guess_type(file)[0] or '').startswith('audio/') + """Returns `True` if the file has an audio mime type.""" + ext = _get_extension(file) + if not ext: + metadata = _get_metadata(file) + if metadata and metadata.has('mime_type'): + return metadata.get('mime_type').startswith('audio/') + else: + return False + else: + file = 'a' + ext + return (mimetypes.guess_type(file)[0] or '').startswith('audio/') def is_video(file): - """Returns `True` if the file extension looks like a video file.""" - file = 'a' + _get_extension(file) - return (mimetypes.guess_type(file)[0] or '').startswith('video/') + """Returns `True` if the file has a video mime type.""" + ext = _get_extension(file) + if not ext: + metadata = _get_metadata(file) + if metadata and metadata.has('mime_type'): + return metadata.get('mime_type').startswith('video/') + else: + return False + else: + file = 'a' + ext + return (mimetypes.guess_type(file)[0] or '').startswith('video/') def is_list_like(obj):