From 1a3feec481f33035f356971ef1f8a7f7cc9d0d48 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 18 Jan 2018 13:55:03 +0100 Subject: [PATCH] Move upload/download file methods to the TelegramClient --- telethon/telegram_bare_client.py | 217 +-------------------------- telethon/telegram_client.py | 248 ++++++++++++++++++++++++++++++- 2 files changed, 251 insertions(+), 214 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index af86c0f1..9684a034 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -3,19 +3,16 @@ import os import threading import warnings from datetime import timedelta, datetime -from hashlib import md5 -from io import BytesIO from signal import signal, SIGINT, SIGTERM, SIGABRT from threading import Lock from time import sleep -from . import helpers as utils, version -from .crypto import rsa, CdnDecrypter +from . import version +from .crypto import rsa from .errors import ( - RPCError, BrokenAuthKeyError, ServerError, - FloodWaitError, FloodTestPhoneWaitError, FileMigrateError, - TypeNotFoundError, UnauthorizedError, PhoneMigrateError, - NetworkMigrateError, UserMigrateError + RPCError, BrokenAuthKeyError, ServerError, FloodWaitError, + FloodTestPhoneWaitError, TypeNotFoundError, UnauthorizedError, + PhoneMigrateError, NetworkMigrateError, UserMigrateError ) from .network import authenticator, MtProtoSender, Connection, ConnectionMode from .tl import TLObject, Session @@ -30,15 +27,8 @@ from .tl.functions.help import ( GetCdnConfigRequest, GetConfigRequest ) from .tl.functions.updates import GetStateRequest -from .tl.functions.upload import ( - GetFileRequest, SaveBigFilePartRequest, SaveFilePartRequest -) -from .tl.types import InputFile, InputFileBig from .tl.types.auth import ExportedAuthorization -from .tl.types.upload import FileCdnRedirect from .update_state import UpdateState -from .utils import get_appropriated_part_size - DEFAULT_DC_ID = 4 DEFAULT_IPV4_IP = '149.154.167.51' @@ -565,203 +555,6 @@ class TelegramBareClient: # endregion - # region Uploading media - - def upload_file(self, - file, - part_size_kb=None, - file_name=None, - allow_cache=True, - progress_callback=None): - """Uploads the specified file and returns a handle (an instance - of InputFile or InputFileBig, as required) which can be later used. - - Uploading a file will simply return a "handle" to the file stored - remotely in the Telegram servers, which can be later used on. This - will NOT upload the file to your own chat. - - 'file' may be either a file path, a byte array, or a stream. - Note that if the file is a stream it will need to be read - entirely into memory to tell its size first. - - If 'progress_callback' is not None, it should be a function that - takes two parameters, (bytes_uploaded, total_bytes). - - Default values for the optional parameters if left as None are: - part_size_kb = get_appropriated_part_size(file_size) - file_name = os.path.basename(file_path) - """ - if isinstance(file, (InputFile, InputFileBig)): - return file # Already uploaded - - if isinstance(file, str): - file_size = os.path.getsize(file) - elif isinstance(file, bytes): - file_size = len(file) - else: - file = file.read() - file_size = len(file) - - # File will now either be a string or bytes - if not part_size_kb: - part_size_kb = get_appropriated_part_size(file_size) - - if part_size_kb > 512: - raise ValueError('The part size must be less or equal to 512KB') - - part_size = int(part_size_kb * 1024) - if part_size % 1024 != 0: - raise ValueError('The part size must be evenly divisible by 1024') - - # Set a default file name if None was specified - file_id = utils.generate_random_long() - if not file_name: - if isinstance(file, str): - file_name = os.path.basename(file) - else: - file_name = str(file_id) - - # Determine whether the file is too big (over 10MB) or not - # Telegram does make a distinction between smaller or larger files - is_large = file_size > 10 * 1024 * 1024 - if not is_large: - # Calculate the MD5 hash before anything else. - # As this needs to be done always for small files, - # might as well do it before anything else and - # check the cache. - if isinstance(file, str): - with open(file, 'rb') as stream: - file = stream.read() - hash_md5 = md5(file) - else: - hash_md5 = None - - part_count = (file_size + part_size - 1) // part_size - __log__.info('Uploading file of %d bytes in %d chunks of %d', - file_size, part_count, part_size) - - with open(file, 'rb') if isinstance(file, str) else BytesIO(file) \ - as stream: - for part_index in range(part_count): - # Read the file by in chunks of size part_size - part = stream.read(part_size) - - # The SavePartRequest is different depending on whether - # the file is too large or not (over or less than 10MB) - if is_large: - request = SaveBigFilePartRequest(file_id, part_index, - part_count, part) - else: - request = SaveFilePartRequest(file_id, part_index, part) - - result = self(request) - if result: - __log__.debug('Uploaded %d/%d', part_index + 1, part_count) - if progress_callback: - progress_callback(stream.tell(), file_size) - else: - raise RuntimeError( - 'Failed to upload file part {}.'.format(part_index)) - - if is_large: - return InputFileBig(file_id, part_count, file_name) - else: - return InputFile(file_id, part_count, file_name, - md5_checksum=hash_md5.hexdigest()) - - # endregion - - # region Downloading media - - def download_file(self, - input_location, - file, - part_size_kb=None, - file_size=None, - progress_callback=None): - """Downloads the given InputFileLocation to file (a stream or str). - - If 'progress_callback' is not None, it should be a function that - takes two parameters, (bytes_downloaded, total_bytes). Note that - 'total_bytes' simply equals 'file_size', and may be None. - """ - if not part_size_kb: - if not file_size: - part_size_kb = 64 # Reasonable default - else: - part_size_kb = get_appropriated_part_size(file_size) - - part_size = int(part_size_kb * 1024) - # https://core.telegram.org/api/files says: - # > part_size % 1024 = 0 (divisible by 1KB) - # - # But https://core.telegram.org/cdn (more recent) says: - # > limit must be divisible by 4096 bytes - # So we just stick to the 4096 limit. - if part_size % 4096 != 0: - raise ValueError('The part size must be evenly divisible by 4096.') - - if isinstance(file, str): - # Ensure that we'll be able to download the media - utils.ensure_parent_dir_exists(file) - f = open(file, 'wb') - else: - f = file - - # The used client will change if FileMigrateError occurs - client = self - cdn_decrypter = None - - __log__.info('Downloading file in chunks of %d bytes', part_size) - try: - offset = 0 - while True: - try: - if cdn_decrypter: - result = cdn_decrypter.get_file() - else: - result = client(GetFileRequest( - input_location, offset, part_size - )) - - if isinstance(result, FileCdnRedirect): - __log__.info('File lives in a CDN') - cdn_decrypter, result = \ - CdnDecrypter.prepare_decrypter( - client, self._get_cdn_client(result), result - ) - - except FileMigrateError as e: - __log__.info('File lives in another DC') - client = self._get_exported_client(e.new_dc) - continue - - offset += part_size - - # If we have received no data (0 bytes), the file is over - # So there is nothing left to download and write - if not result.bytes: - # Return some extra information, unless it's a CDN file - return getattr(result, 'type', '') - - f.write(result.bytes) - __log__.debug('Saved %d more bytes', len(result.bytes)) - if progress_callback: - progress_callback(f.tell(), file_size) - finally: - if client != self: - client.disconnect() - - if cdn_decrypter: - try: - cdn_decrypter.client.disconnect() - except: - pass - if isinstance(file, str): - f.close() - - # endregion - # region Updates handling def sync_updates(self): diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 84c9beea..6e69e3f6 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -1,4 +1,6 @@ +import hashlib import itertools +import logging import os import sys import time @@ -6,6 +8,14 @@ from collections import OrderedDict, UserList from datetime import datetime, timedelta from mimetypes import guess_type +from io import BytesIO + +from telethon.crypto import CdnDecrypter +from telethon.tl.functions.upload import ( + SaveBigFilePartRequest, SaveFilePartRequest, GetFileRequest +) +from telethon.tl.types.upload import FileCdnRedirect + try: import socks except ImportError: @@ -16,7 +26,8 @@ from . import helpers, utils from .errors import ( RPCError, UnauthorizedError, PhoneCodeEmptyError, PhoneCodeExpiredError, PhoneCodeHashEmptyError, PhoneCodeInvalidError, LocationInvalidError, - SessionPasswordNeededError, FilePartMissingError) + SessionPasswordNeededError, FilePartMissingError, FileMigrateError +) from .network import ConnectionMode from .tl import TLObject from .tl.custom import Draft, Dialog @@ -55,11 +66,13 @@ from .tl.types import ( UpdateNewChannelMessage, UpdateNewMessage, UpdateShortSentMessage, PeerUser, InputPeerUser, InputPeerChat, InputPeerChannel, MessageEmpty, ChatInvite, ChatInviteAlready, PeerChannel, Photo, InputPeerSelf, - InputSingleMedia, InputMediaPhoto, InputPhoto + InputSingleMedia, InputMediaPhoto, InputPhoto, InputFile, InputFileBig ) from .tl.types.messages import DialogsSlice from .extensions import markdown +__log__ = logging.getLogger(__name__) + class TelegramClient(TelegramBareClient): """ @@ -1011,6 +1024,130 @@ class TelegramClient(TelegramBareClient): progress_callback=progress_callback, reply_to=reply_to ) + def upload_file(self, + file, + part_size_kb=None, + file_name=None, + allow_cache=True, + progress_callback=None): + """ + Uploads the specified file and returns a handle (an instance of + InputFile or InputFileBig, as required) which can be later used + before it expires (they are usable during less than a day). + + Uploading a file will simply return a "handle" to the file stored + remotely in the Telegram servers, which can be later used on. This + will **not** upload the file to your own chat or any chat at all. + + Args: + file (:obj:`str` | :obj:`bytes` | :obj:`file`): + The path of the file, byte array, or stream that will be sent. + Note that if a byte array or a stream is given, a filename + or its type won't be inferred, and it will be sent as an + "unnamed application/octet-stream". + + Subsequent calls with the very same file will result in + immediate uploads, unless ``.clear_file_cache()`` is called. + + part_size_kb (:obj:`int`, optional): + Chunk size when uploading files. The larger, the less + requests will be made (up to 512KB maximum). + + file_name (:obj:`str`, optional): + The file name which will be used on the resulting InputFile. + If not specified, the name will be taken from the ``file`` + and if this is not a ``str``, it will be ``"unnamed"``. + + allow_cache (:obj:`bool`, optional): + Whether to allow reusing the file from cache or not. Unused. + + progress_callback (:obj:`callable`, optional): + A callback function accepting two parameters: + ``(sent bytes, total)``. + + Returns: + The InputFile (or InputFileBig if >10MB). + """ + if isinstance(file, (InputFile, InputFileBig)): + return file # Already uploaded + + if isinstance(file, str): + file_size = os.path.getsize(file) + elif isinstance(file, bytes): + file_size = len(file) + else: + file = file.read() + file_size = len(file) + + # File will now either be a string or bytes + if not part_size_kb: + part_size_kb = utils.get_appropriated_part_size(file_size) + + if part_size_kb > 512: + raise ValueError('The part size must be less or equal to 512KB') + + part_size = int(part_size_kb * 1024) + if part_size % 1024 != 0: + raise ValueError( + 'The part size must be evenly divisible by 1024') + + # Set a default file name if None was specified + file_id = helpers.generate_random_long() + if not file_name: + if isinstance(file, str): + file_name = os.path.basename(file) + else: + file_name = str(file_id) + + # Determine whether the file is too big (over 10MB) or not + # Telegram does make a distinction between smaller or larger files + is_large = file_size > 10 * 1024 * 1024 + if not is_large: + # Calculate the MD5 hash before anything else. + # As this needs to be done always for small files, + # might as well do it before anything else and + # check the cache. + if isinstance(file, str): + with open(file, 'rb') as stream: + file = stream.read() + hash_md5 = hashlib.md5(file) + else: + hash_md5 = None + + part_count = (file_size + part_size - 1) // part_size + __log__.info('Uploading file of %d bytes in %d chunks of %d', + file_size, part_count, part_size) + + with open(file, 'rb') if isinstance(file, str) else BytesIO(file) \ + as stream: + for part_index in range(part_count): + # Read the file by in chunks of size part_size + part = stream.read(part_size) + + # The SavePartRequest is different depending on whether + # the file is too large or not (over or less than 10MB) + if is_large: + request = SaveBigFilePartRequest(file_id, part_index, + part_count, part) + else: + request = SaveFilePartRequest(file_id, part_index, part) + + result = self(request) + if result: + __log__.debug('Uploaded %d/%d', part_index + 1, + part_count) + if progress_callback: + progress_callback(stream.tell(), file_size) + else: + raise RuntimeError( + 'Failed to upload file part {}.'.format(part_index)) + + if is_large: + return InputFileBig(file_id, part_count, file_name) + else: + return InputFile(file_id, part_count, file_name, + md5_checksum=hash_md5.hexdigest()) + # endregion # region Downloading media requests @@ -1292,6 +1429,113 @@ class TelegramClient(TelegramBareClient): return result i += 1 + def download_file(self, + input_location, + file, + part_size_kb=None, + file_size=None, + progress_callback=None): + """ + Downloads the given input location to a file. + + Args: + input_location (:obj:`InputFileLocation`): + The file location from which the file will be downloaded. + + file (:obj:`str` | :obj:`file`, optional): + The output file path, directory, or stream-like object. + If the path exists and is a file, it will be overwritten. + + part_size_kb (:obj:`int`, optional): + Chunk size when downloading files. The larger, the less + requests will be made (up to 512KB maximum). + + file_size (:obj:`int`, optional): + The file size that is about to be downloaded, if known. + Only used if ``progress_callback`` is specified. + + progress_callback (:obj:`callable`, optional): + A callback function accepting two parameters: + ``(downloaded bytes, total)``. Note that the + ``total`` is the provided ``file_size``. + """ + if not part_size_kb: + if not file_size: + part_size_kb = 64 # Reasonable default + else: + part_size_kb = utils.get_appropriated_part_size(file_size) + + part_size = int(part_size_kb * 1024) + # https://core.telegram.org/api/files says: + # > part_size % 1024 = 0 (divisible by 1KB) + # + # But https://core.telegram.org/cdn (more recent) says: + # > limit must be divisible by 4096 bytes + # So we just stick to the 4096 limit. + if part_size % 4096 != 0: + raise ValueError( + 'The part size must be evenly divisible by 4096.') + + if isinstance(file, str): + # Ensure that we'll be able to download the media + helpers.ensure_parent_dir_exists(file) + f = open(file, 'wb') + else: + f = file + + # The used client will change if FileMigrateError occurs + client = self + cdn_decrypter = None + + __log__.info('Downloading file in chunks of %d bytes', part_size) + try: + offset = 0 + while True: + try: + if cdn_decrypter: + result = cdn_decrypter.get_file() + else: + result = client(GetFileRequest( + input_location, offset, part_size + )) + + if isinstance(result, FileCdnRedirect): + __log__.info('File lives in a CDN') + cdn_decrypter, result = \ + CdnDecrypter.prepare_decrypter( + client, self._get_cdn_client(result), + result + ) + + except FileMigrateError as e: + __log__.info('File lives in another DC') + client = self._get_exported_client(e.new_dc) + continue + + offset += part_size + + # If we have received no data (0 bytes), the file is over + # So there is nothing left to download and write + if not result.bytes: + # Return some extra information, unless it's a CDN file + return getattr(result, 'type', '') + + f.write(result.bytes) + __log__.debug('Saved %d more bytes', len(result.bytes)) + if progress_callback: + progress_callback(f.tell(), file_size) + finally: + if client != self: + client.disconnect() + + if cdn_decrypter: + try: + cdn_decrypter.client.disconnect() + except: + pass + if isinstance(file, str): + f.close() + # endregion # endregion