diff --git a/.gitignore b/.gitignore
index f2090e85..c8b7b750 100755
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,6 @@
# Docs
_build/
+docs/
# Generated code
telethon/tl/functions/
diff --git a/docs/.gitignore b/docs/.gitignore
deleted file mode 100644
index 9ab870da..00000000
--- a/docs/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-generated/
diff --git a/readthedocs/extra/basic/installation.rst b/readthedocs/extra/basic/installation.rst
index c00ea79c..7a240eff 100644
--- a/readthedocs/extra/basic/installation.rst
+++ b/readthedocs/extra/basic/installation.rst
@@ -54,7 +54,7 @@ Manual Installation
3. Enter the cloned repository: ``cd Telethon``
-4. Run the code generator: ``python3 setup.py gen_tl``
+4. Run the code generator: ``python3 setup.py gen tl errors``
5. Done!
diff --git a/setup.py b/setup.py
index 7b0a5db5..e1a7e92e 100755
--- a/setup.py
+++ b/setup.py
@@ -6,16 +6,16 @@ https://packaging.python.org/en/latest/distributing.html
https://github.com/pypa/sampleproject
Extra supported commands are:
-* gen_tl, to generate the classes required for Telethon to run
-* clean_tl, to clean these generated classes
+* gen, to generate the classes required for Telethon to run or docs
* pypi, to generate sdist, bdist_wheel, and push to PyPi
"""
-# To use a consistent encoding
-from codecs import open
-from sys import argv, version_info
import os
import re
+# To use a consistent encoding
+import shutil
+from codecs import open
+from sys import argv, version_info
# Always prefer setuptools over distutils
from setuptools import find_packages, setup
@@ -37,44 +37,85 @@ class TempWorkDir:
os.chdir(self.original)
-ERROR_LIST = 'telethon/errors/rpc_error_list.py'
-ERRORS_JSON = 'telethon_generator/errors.json'
-ERRORS_DESC = 'telethon_generator/error_descriptions'
-SCHEME_TL = 'telethon_generator/scheme.tl'
-GENERATOR_DIR = 'telethon/tl'
+GENERATOR_DIR = 'telethon_generator'
+LIBRARY_DIR = 'telethon'
+
+ERRORS_IN_JSON = os.path.join(GENERATOR_DIR, 'data', 'errors.json')
+ERRORS_IN_DESC = os.path.join(GENERATOR_DIR, 'data', 'error_descriptions')
+ERRORS_OUT = os.path.join(LIBRARY_DIR, 'errors', 'rpc_error_list.py')
+
+TLOBJECT_IN_TL = os.path.join(GENERATOR_DIR, 'data', 'scheme.tl')
+TLOBJECT_OUT = os.path.join(LIBRARY_DIR, 'tl')
IMPORT_DEPTH = 2
+DOCS_IN_RES = os.path.join(GENERATOR_DIR, 'data', 'html')
+DOCS_OUT = 'docs'
-def gen_tl(force=True):
- from telethon_generator.tl_generator import TLGenerator
- from telethon_generator.error_generator import generate_code
- generator = TLGenerator(GENERATOR_DIR)
- if generator.tlobjects_exist():
- if not force:
- return
- print('Detected previous TLObjects. Cleaning...')
- generator.clean_tlobjects()
- print('Generating TLObjects...')
- generator.generate_tlobjects(SCHEME_TL, import_depth=IMPORT_DEPTH)
- print('Generating errors...')
- generate_code(ERROR_LIST, json_file=ERRORS_JSON, errors_desc=ERRORS_DESC)
- print('Done.')
+def generate(which):
+ from telethon_generator.parsers import parse_errors, parse_tl, find_layer
+ from telethon_generator.generators import\
+ generate_errors, generate_tlobjects, generate_docs, clean_tlobjects
+
+ tlobjects = list(parse_tl(TLOBJECT_IN_TL, ignore_core=True))
+ errors = list(parse_errors(ERRORS_IN_JSON, ERRORS_IN_DESC))
+ layer = find_layer(TLOBJECT_IN_TL)
+
+ if not which:
+ which.extend(('tl', 'errors'))
+
+ clean = 'clean' in which
+ action = 'Cleaning' if clean else 'Generating'
+ if clean:
+ which.remove('clean')
+
+ if 'all' in which:
+ which.remove('all')
+ for x in ('tl', 'errors', 'docs'):
+ if x not in which:
+ which.append(x)
+
+ if 'tl' in which:
+ which.remove('tl')
+ print(action, 'TLObjects...')
+ if clean:
+ clean_tlobjects(TLOBJECT_OUT)
+ else:
+ generate_tlobjects(tlobjects, layer, IMPORT_DEPTH, TLOBJECT_OUT)
+
+ if 'errors' in which:
+ which.remove('errors')
+ print(action, 'RPCErrors...')
+ if clean:
+ if os.path.isfile(ERRORS_OUT):
+ os.remove(ERRORS_OUT)
+ else:
+ with open(ERRORS_OUT, 'w', encoding='utf-8') as file:
+ generate_errors(errors, file)
+
+ if 'docs' in which:
+ which.remove('docs')
+ print(action, 'documentation...')
+ if clean:
+ if os.path.isdir(DOCS_OUT):
+ shutil.rmtree(DOCS_OUT)
+ else:
+ generate_docs(tlobjects, errors, layer, DOCS_IN_RES, DOCS_OUT)
+
+ if which:
+ print('The following items were not understood:', which)
+ print(' Consider using only "tl", "errors" and/or "docs".')
+ print(' Using only "clean" will clean them. "all" to act on all.')
+ print(' For instance "gen tl errors".')
def main():
- if len(argv) >= 2 and argv[1] == 'gen_tl':
- gen_tl()
-
- elif len(argv) >= 2 and argv[1] == 'clean_tl':
- from telethon_generator.tl_generator import TLGenerator
- print('Cleaning...')
- TLGenerator(GENERATOR_DIR).clean_tlobjects()
- print('Done.')
+ if len(argv) >= 2 and argv[1] == 'gen':
+ generate(argv[2:])
elif len(argv) >= 2 and argv[1] == 'pypi':
# (Re)generate the code to make sure we don't push without it
- gen_tl()
+ generate(['clean', 'tl', 'errors'])
# Try importing the telethon module to assert it has no errors
try:
@@ -96,14 +137,10 @@ def main():
for x in ('build', 'dist', 'Telethon.egg-info'):
rmtree(x, ignore_errors=True)
- elif len(argv) >= 2 and argv[1] == 'fetch_errors':
- from telethon_generator.error_generator import fetch_errors
- fetch_errors(ERRORS_JSON)
-
else:
- # Call gen_tl() if the scheme.tl file exists, e.g. install from GitHub
- if os.path.isfile(SCHEME_TL):
- gen_tl(force=False)
+ # e.g. install from GitHub
+ if os.path.isfile(GENERATOR_DIR):
+ generate(['clean', 'tl', 'errors'])
# Get the long description from the README file
with open('README.rst', encoding='utf-8') as f:
diff --git a/telethon/errors/__init__.py b/telethon/errors/__init__.py
index d9875849..8b4e9f88 100644
--- a/telethon/errors/__init__.py
+++ b/telethon/errors/__init__.py
@@ -12,6 +12,7 @@ from .common import (
)
# This imports the base errors too, as they're imported there
+from .rpc_base_errors import *
from .rpc_error_list import *
diff --git a/telethon/tl/tlobject.py b/telethon/tl/tlobject.py
index 57975ec3..eee9222a 100644
--- a/telethon/tl/tlobject.py
+++ b/telethon/tl/tlobject.py
@@ -189,6 +189,6 @@ class TLObject:
def __bytes__(self):
return b''
- @staticmethod
- def from_reader(reader):
+ @classmethod
+ def from_reader(cls, reader):
return TLObject()
diff --git a/telethon_generator/error_descriptions b/telethon_generator/data/error_descriptions
similarity index 100%
rename from telethon_generator/error_descriptions
rename to telethon_generator/data/error_descriptions
diff --git a/telethon_generator/errors.json b/telethon_generator/data/errors.json
similarity index 100%
rename from telethon_generator/errors.json
rename to telethon_generator/data/errors.json
diff --git a/docs/res/404.html b/telethon_generator/data/html/404.html
similarity index 100%
rename from docs/res/404.html
rename to telethon_generator/data/html/404.html
diff --git a/docs/res/core.html b/telethon_generator/data/html/core.html
similarity index 100%
rename from docs/res/core.html
rename to telethon_generator/data/html/core.html
diff --git a/docs/res/css/docs.css b/telethon_generator/data/html/css/docs.css
similarity index 100%
rename from docs/res/css/docs.css
rename to telethon_generator/data/html/css/docs.css
diff --git a/docs/res/img/arrow.svg b/telethon_generator/data/html/img/arrow.svg
similarity index 100%
rename from docs/res/img/arrow.svg
rename to telethon_generator/data/html/img/arrow.svg
diff --git a/docs/res/js/search.js b/telethon_generator/data/html/js/search.js
similarity index 100%
rename from docs/res/js/search.js
rename to telethon_generator/data/html/js/search.js
diff --git a/telethon_generator/scheme.tl b/telethon_generator/data/scheme.tl
similarity index 100%
rename from telethon_generator/scheme.tl
rename to telethon_generator/data/scheme.tl
diff --git a/docs/docs_writer.py b/telethon_generator/docs_writer.py
similarity index 98%
rename from docs/docs_writer.py
rename to telethon_generator/docs_writer.py
index 82241a48..82deef9d 100644
--- a/docs/docs_writer.py
+++ b/telethon_generator/docs_writer.py
@@ -4,7 +4,7 @@ import re
class DocsWriter:
"""Utility class used to write the HTML files used on the documentation"""
- def __init__(self, filename, type_to_path_function):
+ def __init__(self, filename, type_to_path):
"""Initializes the writer to the specified output file,
creating the parent directories when used if required.
@@ -19,7 +19,7 @@ class DocsWriter:
self.menu_separator_tag = None
# Utility functions TODO There must be a better way
- self.type_to_path = lambda t: type_to_path_function(
+ self.type_to_path = lambda t: type_to_path(
t, relative_to=self.filename
)
diff --git a/telethon_generator/error_generator.py b/telethon_generator/error_generator.py
deleted file mode 100644
index 4aad78ec..00000000
--- a/telethon_generator/error_generator.py
+++ /dev/null
@@ -1,177 +0,0 @@
-import json
-import re
-import urllib.request
-from collections import defaultdict
-
-URL = 'https://rpc.pwrtelegram.xyz/?all'
-
-known_base_classes = {
- 303: 'InvalidDCError',
- 400: 'BadRequestError',
- 401: 'UnauthorizedError',
- 403: 'ForbiddenError',
- 404: 'NotFoundError',
- 406: 'AuthKeyError',
- 420: 'FloodError',
- 500: 'ServerError',
-}
-
-# The API doesn't return the code for some (vital) errors. They are
-# all assumed to be 400, except these well-known ones that aren't.
-known_codes = {
- 'ACTIVE_USER_REQUIRED': 401,
- 'AUTH_KEY_UNREGISTERED': 401,
- 'USER_DEACTIVATED': 401
-}
-
-
-def fetch_errors(output, url=URL):
- print('Opening a connection to', url, '...')
- r = urllib.request.urlopen(urllib.request.Request(
- url, headers={'User-Agent' : 'Mozilla/5.0'}
- ))
- print('Checking response...')
- data = json.loads(
- r.read().decode(r.info().get_param('charset') or 'utf-8')
- )
- if data.get('ok'):
- print('Response was okay, saving data')
- with open(output, 'w', encoding='utf-8') as f:
- json.dump(data, f, sort_keys=True)
- return True
- else:
- print('The data received was not okay:')
- print(json.dumps(data, indent=4, sort_keys=True))
- return False
-
-
-def get_class_name(error_code):
- if isinstance(error_code, int):
- return known_base_classes.get(
- error_code, 'RPCError' + str(error_code).replace('-', 'Neg')
- )
-
- if 'FIRSTNAME' in error_code:
- error_code = error_code.replace('FIRSTNAME', 'FIRST_NAME')
-
- result = re.sub(
- r'_([a-z])', lambda m: m.group(1).upper(), error_code.lower()
- )
- return result[:1].upper() + result[1:].replace('_', '') + 'Error'
-
-
-def write_error(f, code, name, desc, capture_name):
- f.write(
- '\n\nclass {}({}):\n def __init__(self, **kwargs):\n '
- ''.format(name, get_class_name(code))
- )
- if capture_name:
- f.write(
- "self.{} = int(kwargs.get('capture', 0))\n ".format(capture_name)
- )
- f.write('super(Exception, self).__init__({}'.format(repr(desc)))
- if capture_name:
- f.write('.format(self.{})'.format(capture_name))
- f.write(')\n')
-
-
-def generate_code(output, json_file, errors_desc):
- with open(json_file, encoding='utf-8') as f:
- data = json.load(f)
-
- errors = defaultdict(set)
- # PWRTelegram's API doesn't return all errors, which we do need here.
- # Add some special known-cases manually first.
- errors[420].update((
- 'FLOOD_WAIT_X', 'FLOOD_TEST_PHONE_WAIT_X'
- ))
- errors[401].update((
- 'AUTH_KEY_INVALID', 'SESSION_EXPIRED', 'SESSION_REVOKED'
- ))
- errors[303].update((
- 'FILE_MIGRATE_X', 'PHONE_MIGRATE_X',
- 'NETWORK_MIGRATE_X', 'USER_MIGRATE_X'
- ))
- for error_code, method_errors in data['result'].items():
- for error_list in method_errors.values():
- for error in error_list:
- errors[int(error_code)].add(re.sub('_\d+', '_X', error).upper())
-
- # Some errors are in the human result, but not with a code. Assume code 400
- for error in data['human_result']:
- if error[0] != '-' and not error.isdigit():
- error = re.sub('_\d+', '_X', error).upper()
- if not any(error in es for es in errors.values()):
- errors[known_codes.get(error, 400)].add(error)
-
- # Some error codes are not known, so create custom base classes if needed
- needed_base_classes = [
- (e, get_class_name(e)) for e in errors if e not in known_base_classes
- ]
-
- # Prefer the descriptions that are related with Telethon way of coding to
- # those that PWRTelegram's API provides.
- telethon_descriptions = {}
- with open(errors_desc, encoding='utf-8') as f:
- for line in f:
- line = line.strip()
- if line and not line.startswith('#'):
- equal = line.index('=')
- message, description = line[:equal], line[equal + 1:]
- telethon_descriptions[message.rstrip()] = description.lstrip()
-
- # Names for the captures, or 'x' if unknown
- capture_names = {
- 'FloodWaitError': 'seconds',
- 'FloodTestPhoneWaitError': 'seconds',
- 'FileMigrateError': 'new_dc',
- 'NetworkMigrateError': 'new_dc',
- 'PhoneMigrateError': 'new_dc',
- 'UserMigrateError': 'new_dc',
- 'FilePartMissingError': 'which'
- }
-
- # Everything ready, generate the code
- with open(output, 'w', encoding='utf-8') as f:
- f.write(
- 'from .rpc_base_errors import RPCError, BadMessageError, {}\n'.format(
- ", ".join(known_base_classes.values()))
- )
- for code, cls in needed_base_classes:
- f.write(
- '\n\nclass {}(RPCError):\n code = {}\n'.format(cls, code)
- )
-
- patterns = [] # Save this dictionary later in the generated code
- for error_code, error_set in errors.items():
- for error in sorted(error_set):
- description = telethon_descriptions.get(
- error, '\n'.join(data['human_result'].get(
- error, ['No description known.']
- ))
- )
- has_captures = '_X' in error
- if has_captures:
- name = get_class_name(error.replace('_X', ''))
- pattern = error.replace('_X', r'_(\d+)')
- else:
- name, pattern = get_class_name(error), error
-
- patterns.append((pattern, name))
- capture = capture_names.get(name, 'x') if has_captures else None
- # TODO Some errors have the same name but different code,
- # split this across different files?
- write_error(f, error_code, name, description, capture)
-
- f.write('\n\nrpc_errors_all = {\n')
- for pattern, name in patterns:
- f.write(' {}: {},\n'.format(repr(pattern), name))
- f.write('}\n')
-
-
-if __name__ == '__main__':
- if input('generate (y/n)?: ').lower() == 'y':
- generate_code('../telethon/errors/rpc_error_list.py',
- 'errors.json', 'error_descriptions')
- elif input('fetch (y/n)?: ').lower() == 'y':
- fetch_errors('errors.json')
diff --git a/telethon_generator/fetch_errors.py b/telethon_generator/fetch_errors.py
new file mode 100644
index 00000000..e7712efd
--- /dev/null
+++ b/telethon_generator/fetch_errors.py
@@ -0,0 +1,32 @@
+import sys
+import json
+import urllib.request
+
+OUT = 'data/errors.json'
+URL = 'https://rpc.pwrtelegram.xyz/?all'
+
+
+def fetch_errors(output, url=URL):
+ print('Opening a connection to', url, '...')
+ r = urllib.request.urlopen(urllib.request.Request(
+ url, headers={'User-Agent' : 'Mozilla/5.0'}
+ ))
+ print('Checking response...')
+ data = json.loads(
+ r.read().decode(r.info().get_param('charset') or 'utf-8')
+ )
+ if data.get('ok'):
+ print('Response was okay, saving data')
+ with open(output, 'w', encoding='utf-8') as f:
+ json.dump(data, f, sort_keys=True)
+ return True
+ else:
+ print('The data received was not okay:')
+ print(json.dumps(data, indent=4, sort_keys=True))
+ return False
+
+
+if __name__ == '__main__':
+ out = OUT if len(sys.argv) < 2 else sys.argv[2]
+ url = URL if len(sys.argv) < 3 else sys.argv[3]
+ fetch_errors(out, url)
diff --git a/telethon_generator/generator.py b/telethon_generator/generator.py
new file mode 100644
index 00000000..4314bd26
--- /dev/null
+++ b/telethon_generator/generator.py
@@ -0,0 +1,26 @@
+from telethon_generator.parsers import parse_errors, parse_tl, find_layer
+from telethon_generator.generators import\
+ generate_errors, generate_tlobjects, generate_docs
+
+
+ERRORS_INPUT_JSON = 'data/errors.json'
+ERRORS_INPUT_DESC = 'data/error_descriptions'
+ERRORS_OUTPUT = '../telethon/errors/rpc_error_list.py'
+
+TLOBJECT_INPUT_TL = 'data/scheme.tl'
+TLOBJECT_OUTPUT = '../telethon/tl'
+
+DOCS_INPUT_RES = 'data/html'
+DOCS_OUTPUT = '../docs'
+
+
+if __name__ == '__main__':
+ tlobjects = list(parse_tl(TLOBJECT_INPUT_TL, ignore_core=True))
+ errors = list(parse_errors(ERRORS_INPUT_JSON, ERRORS_INPUT_DESC))
+ layer = find_layer(TLOBJECT_INPUT_TL)
+
+ generate_tlobjects(tlobjects, layer, TLOBJECT_OUTPUT)
+ with open(ERRORS_OUTPUT, 'w', encoding='utf-8') as file:
+ generate_errors(errors, file)
+
+ generate_docs(tlobjects, errors, layer, DOCS_INPUT_RES, DOCS_OUTPUT)
diff --git a/telethon_generator/generators/__init__.py b/telethon_generator/generators/__init__.py
new file mode 100644
index 00000000..156606e0
--- /dev/null
+++ b/telethon_generator/generators/__init__.py
@@ -0,0 +1,3 @@
+from .errors import generate_errors
+from .tlobject import generate_tlobjects, clean_tlobjects
+from .docs import generate_docs
diff --git a/docs/generate.py b/telethon_generator/generators/docs.py
similarity index 61%
rename from docs/generate.py
rename to telethon_generator/generators/docs.py
index 75ab3091..2da03e32 100755
--- a/docs/generate.py
+++ b/telethon_generator/generators/docs.py
@@ -1,124 +1,74 @@
#!/usr/bin/env python3
+import functools
import os
import re
-import sys
import shutil
-try:
- from .docs_writer import DocsWriter
-except (ImportError, SystemError):
- from docs_writer import DocsWriter
+from collections import defaultdict
-# Small trick so importing telethon_generator works
-sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
-
-from telethon_generator.parser import TLParser, TLObject
+from ..docs_writer import DocsWriter
+from ..parsers import TLObject
+from ..utils import snake_to_camel_case
-# TLObject -> Python class name
-def get_class_name(tlobject):
- """Gets the class name following the Python style guidelines"""
- # Courtesy of http://stackoverflow.com/a/31531797/4759433
+CORE_TYPES = {
+ 'int', 'long', 'int128', 'int256', 'double',
+ 'vector', 'string', 'bool', 'true', 'bytes', 'date'
+}
+
+
+def _get_file_name(tlobject):
+ """``ClassName -> class_name.html``."""
name = tlobject.name if isinstance(tlobject, TLObject) else tlobject
- result = re.sub(r'_([a-z])', lambda m: m.group(1).upper(), name)
-
- # Replace '_' with '' once again to make sure it doesn't appear on the name
- result = result[:1].upper() + result[1:].replace('_', '')
-
- # If it's a function, let it end with "Request" to identify them more easily
- if isinstance(tlobject, TLObject) and tlobject.is_function:
- result += 'Request'
-
- return result
-
-
-# TLObject -> filename
-def get_file_name(tlobject, add_extension=False):
- """Gets the file name in file_name_format.html for the given TLObject.
- Only its name may also be given if the full TLObject is not available"""
- if isinstance(tlobject, TLObject):
- name = tlobject.name
- else:
- name = tlobject
-
# Courtesy of http://stackoverflow.com/a/1176023/4759433
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
result = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
- if add_extension:
- return result + '.html'
- else:
- return result
+ return '{}.html'.format(result)
-# TLObject -> from ... import ...
def get_import_code(tlobject):
+ """``TLObject -> from ... import ...``."""
kind = 'functions' if tlobject.is_function else 'types'
ns = '.' + tlobject.namespace if tlobject.namespace else ''
-
return 'from telethon.tl.{}{} import {}'\
- .format(kind, ns, get_class_name(tlobject))
+ .format(kind, ns, tlobject.class_name)
-def get_create_path_for(tlobject):
- """Gets the file path (and creates the parent directories)
- for the given 'tlobject', relative to nothing; only its local path"""
-
- # Determine the output directory
+def _get_create_path_for(root, tlobject):
+ """Creates and returns the path for the given TLObject at root."""
out_dir = 'methods' if tlobject.is_function else 'constructors'
-
if tlobject.namespace:
out_dir = os.path.join(out_dir, tlobject.namespace)
- # Ensure that it exists
+ out_dir = os.path.join(root, out_dir)
os.makedirs(out_dir, exist_ok=True)
-
- # Return the resulting filename
- return os.path.join(out_dir, get_file_name(tlobject, add_extension=True))
+ return os.path.join(out_dir, _get_file_name(tlobject))
-def is_core_type(type_):
- """Returns "true" if the type is considered a core type"""
- return type_.lower() in {
- 'int', 'long', 'int128', 'int256', 'double',
- 'vector', 'string', 'bool', 'true', 'bytes', 'date'
- }
-
-
-def get_path_for_type(type_, relative_to='.'):
- """Similar to getting the path for a TLObject, it might not be possible
- to have the TLObject itself but rather its name (the type);
- this method works in the same way, returning a relative path"""
- if is_core_type(type_):
+def get_path_for_type(root, type_, relative_to='.'):
+ """Similar to `_get_create_path_for` but for only type names."""
+ if type_.lower() in CORE_TYPES:
path = 'index.html#%s' % type_.lower()
-
elif '.' in type_:
- # If it's not a core type, then it has to be a custom Telegram type
namespace, name = type_.split('.')
- path = 'types/%s/%s' % (namespace, get_file_name(name, True))
+ path = 'types/%s/%s' % (namespace, _get_file_name(name))
else:
- path = 'types/%s' % get_file_name(type_, True)
+ path = 'types/%s' % _get_file_name(type_)
- return get_relative_path(path, relative_to)
+ return _get_relative_path(os.path.join(root, path), relative_to)
-# Destination path from the current position -> relative to the given path
-def get_relative_path(destination, relative_to):
- if os.path.isfile(relative_to):
+def _get_relative_path(destination, relative_to, folder=False):
+ """Return the relative path to destination from relative_to."""
+ if not folder:
relative_to = os.path.dirname(relative_to)
return os.path.relpath(destination, start=relative_to)
-def get_relative_paths(original, relative_to):
- """Converts the dictionary of 'original' paths to relative paths
- starting from the given 'relative_to' file"""
- return {k: get_relative_path(v, relative_to) for k, v in original.items()}
-
-
-# Generate a index.html file for the given folder
-def find_title(html_file):
- """Finds the
for the given HTML file, or (Unknown)"""
- with open(html_file) as handle:
- for line in handle:
+def _find_title(html_file):
+ """Finds the for the given HTML file, or (Unknown)."""
+ with open(html_file) as fp:
+ for line in fp:
if '' in line:
# + 7 to skip len('')
return line[line.index('') + 7:line.index('')]
@@ -126,10 +76,11 @@ def find_title(html_file):
return '(Unknown)'
-def build_menu(docs, filename, relative_main_index):
+def _build_menu(docs, filename, root, relative_main_index):
"""Builds the menu using the given DocumentWriter up to 'filename',
which must be a file (it cannot be a directory)"""
# TODO Maybe this could be part of DocsWriter itself, "build path menu"
+ filename = _get_relative_path(filename, root)
docs.add_menu('API', relative_main_index)
items = filename.split('/')
@@ -144,9 +95,8 @@ def build_menu(docs, filename, relative_main_index):
docs.end_menu()
-def generate_index(folder, original_paths):
+def _generate_index(folder, original_paths, root):
"""Generates the index file for the specified folder"""
-
# Determine the namespaces listed here (as sub folders)
# and the files (.html files) that we should link to
namespaces = []
@@ -157,27 +107,28 @@ def generate_index(folder, original_paths):
elif item != 'index.html':
files.append(item)
- # We work with relative paths
- paths = get_relative_paths(original_paths, relative_to=folder)
+ paths = {k: _get_relative_path(v, folder, folder=True)
+ for k, v in original_paths.items()}
# Now that everything is setup, write the index.html file
filename = os.path.join(folder, 'index.html')
- with DocsWriter(filename, type_to_path_function=get_path_for_type) as docs:
+ with DocsWriter(filename, type_to_path=get_path_for_type) as docs:
# Title should be the current folder name
docs.write_head(folder.title(), relative_css_path=paths['css'])
docs.set_menu_separator(paths['arrow'])
- build_menu(docs, filename, relative_main_index=paths['index_all'])
-
- docs.write_title(folder.title())
+ _build_menu(docs, filename, root,
+ relative_main_index=paths['index_all'])
+ docs.write_title(_get_relative_path(folder, root, folder=True).title())
if namespaces:
docs.write_title('Namespaces', level=3)
docs.begin_table(4)
namespaces.sort()
for namespace in namespaces:
# For every namespace, also write the index of it
- generate_index(os.path.join(folder, namespace), original_paths)
+ _generate_index(os.path.join(folder, namespace),
+ original_paths, root)
docs.add_row(namespace.title(),
link=os.path.join(namespace, 'index.html'))
@@ -186,7 +137,7 @@ def generate_index(folder, original_paths):
docs.write_title('Available items')
docs.begin_table(2)
- files = [(f, find_title(os.path.join(folder, f))) for f in files]
+ files = [(f, _find_title(os.path.join(folder, f))) for f in files]
files.sort(key=lambda t: t[1])
for file, title in files:
@@ -196,8 +147,8 @@ def generate_index(folder, original_paths):
docs.end_body()
-def get_description(arg):
- """Generates a proper description for the given argument"""
+def _get_description(arg):
+ """Generates a proper description for the given argument."""
desc = []
otherwise = False
if arg.can_be_inferred:
@@ -235,7 +186,7 @@ def get_description(arg):
)
-def copy_replace(src, dst, replacements):
+def _copy_replace(src, dst, replacements):
"""Copies the src file into dst applying the replacements dict"""
with open(src) as infile, open(dst, 'w') as outfile:
outfile.write(re.sub(
@@ -245,10 +196,15 @@ def copy_replace(src, dst, replacements):
))
-def generate_documentation(scheme_file):
- """Generates the documentation HTML files from from scheme.tl to
- /methods and /constructors, etc.
+def _write_html_pages(tlobjects, errors, layer, input_res, output_dir):
"""
+ Generates the documentation HTML files from from ``scheme.tl``
+ to ``/methods`` and ``/constructors``, etc.
+ """
+ # Save 'Type: [Constructors]' for use in both:
+ # * Seeing the return type or constructors belonging to the same type.
+ # * Generating the types documentation, showing available constructors.
+ # TODO Tried using 'defaultdict(list)' with strange results, make it work.
original_paths = {
'css': 'css/docs.css',
'arrow': 'img/arrow.svg',
@@ -259,46 +215,46 @@ def generate_documentation(scheme_file):
'index_methods': 'methods/index.html',
'index_constructors': 'constructors/index.html'
}
- tlobjects = tuple(TLParser.parse_file(scheme_file))
+ original_paths = {k: os.path.join(output_dir, v)
+ for k, v in original_paths.items()}
- print('Generating constructors and functions documentation...')
-
- # Save 'Type: [Constructors]' for use in both:
- # * Seeing the return type or constructors belonging to the same type.
- # * Generating the types documentation, showing available constructors.
- # TODO Tried using 'defaultdict(list)' with strange results, make it work.
- tltypes = {}
- tlfunctions = {}
+ type_to_constructors = {}
+ type_to_functions = {}
for tlobject in tlobjects:
- # Select to which dictionary we want to store this type
- dictionary = tlfunctions if tlobject.is_function else tltypes
-
- if tlobject.result in dictionary:
- dictionary[tlobject.result].append(tlobject)
+ d = type_to_functions if tlobject.is_function else type_to_constructors
+ if tlobject.result in d:
+ d[tlobject.result].append(tlobject)
else:
- dictionary[tlobject.result] = [tlobject]
+ d[tlobject.result] = [tlobject]
- for tltype, constructors in tltypes.items():
- tltypes[tltype] = list(sorted(constructors, key=lambda c: c.name))
+ for t, cs in type_to_constructors.items():
+ type_to_constructors[t] = list(sorted(cs, key=lambda c: c.name))
+
+ method_causes_errors = defaultdict(list)
+ for error in errors:
+ for method in error.caused_by:
+ method_causes_errors[method].append(error)
+
+ # Since the output directory is needed everywhere partially apply it now
+ create_path_for = functools.partial(_get_create_path_for, output_dir)
+ path_for_type = functools.partial(get_path_for_type, output_dir)
for tlobject in tlobjects:
- filename = get_create_path_for(tlobject)
+ filename = create_path_for(tlobject)
+ paths = {k: _get_relative_path(v, filename)
+ for k, v in original_paths.items()}
- # Determine the relative paths for this file
- paths = get_relative_paths(original_paths, relative_to=filename)
-
- with DocsWriter(filename, type_to_path_function=get_path_for_type) \
- as docs:
- docs.write_head(
- title=get_class_name(tlobject),
- relative_css_path=paths['css'])
+ with DocsWriter(filename, type_to_path=path_for_type) as docs:
+ docs.write_head(title=tlobject.class_name,
+ relative_css_path=paths['css'])
# Create the menu (path to the current TLObject)
docs.set_menu_separator(paths['arrow'])
- build_menu(docs, filename, relative_main_index=paths['index_all'])
+ _build_menu(docs, filename, output_dir,
+ relative_main_index=paths['index_all'])
# Create the page title
- docs.write_title(get_class_name(tlobject))
+ docs.write_title(tlobject.class_name)
# Write the code definition for this TLObject
docs.write_code(tlobject)
@@ -328,24 +284,24 @@ def generate_documentation(scheme_file):
inner = tlobject.result
docs.begin_table(column_count=1)
- docs.add_row(inner, link=get_path_for_type(
+ docs.add_row(inner, link=path_for_type(
inner, relative_to=filename
))
docs.end_table()
- constructors = tltypes.get(inner, [])
- if not constructors:
+ cs = type_to_constructors.get(inner, [])
+ if not cs:
docs.write_text('This type has no instances available.')
- elif len(constructors) == 1:
+ elif len(cs) == 1:
docs.write_text('This type can only be an instance of:')
else:
docs.write_text('This type can be an instance of either:')
docs.begin_table(column_count=2)
- for constructor in constructors:
- link = get_create_path_for(constructor)
- link = get_relative_path(link, relative_to=filename)
- docs.add_row(get_class_name(constructor), link=link)
+ for constructor in cs:
+ link = create_path_for(constructor)
+ link = _get_relative_path(link, relative_to=filename)
+ docs.add_row(constructor.class_name, link=link)
docs.end_table()
# Return (or similar types) written. Now parameters/members
@@ -375,11 +331,11 @@ def generate_documentation(scheme_file):
else:
docs.add_row(
arg.type, align='center', link=
- get_path_for_type(arg.type, relative_to=filename)
+ path_for_type(arg.type, relative_to=filename)
)
# Add a description for this argument
- docs.add_row(get_description(arg))
+ docs.add_row(_get_description(arg))
docs.end_table()
else:
@@ -388,6 +344,25 @@ def generate_documentation(scheme_file):
else:
docs.write_text('This type has no members.')
+ if tlobject.is_function:
+ docs.write_title('Known RPC errors')
+ errors = method_causes_errors[tlobject.fullname]
+ if not errors:
+ docs.write_text("This request can't cause any RPC error "
+ "as far as we know.")
+ else:
+ docs.write_text(
+ 'This request can cause {} known error{}:'.format(
+ len(errors), '' if len(errors) == 1 else 's'
+ ))
+ docs.begin_table(column_count=2)
+ for error in errors:
+ docs.add_row('{}
'.format(error.name))
+ docs.add_row('{}.'.format(error.description))
+ docs.end_table()
+ docs.write_text('You can import these from '
+ 'telethon.errors
.')
+
# TODO Bit hacky, make everything like this? (prepending '../')
depth = '../' * (2 if tlobject.namespace else 1)
docs.add_script(src='prependPath = "{}";'.format(depth))
@@ -396,55 +371,54 @@ def generate_documentation(scheme_file):
# Find all the available types (which are not the same as the constructors)
# Each type has a list of constructors associated to it, hence is a map
- print('Generating types documentation...')
- for tltype, constructors in tltypes.items():
- filename = get_path_for_type(tltype)
+ for t, cs in type_to_constructors.items():
+ filename = path_for_type(t)
out_dir = os.path.dirname(filename)
if out_dir:
os.makedirs(out_dir, exist_ok=True)
# Since we don't have access to the full TLObject, split the type
- if '.' in tltype:
- namespace, name = tltype.split('.')
+ if '.' in t:
+ namespace, name = t.split('.')
else:
- namespace, name = None, tltype
+ namespace, name = None, t
- # Determine the relative paths for this file
- paths = get_relative_paths(original_paths, relative_to=out_dir)
+ paths = {k: _get_relative_path(v, out_dir, folder=True)
+ for k, v in original_paths.items()}
- with DocsWriter(filename, type_to_path_function=get_path_for_type) \
- as docs:
+ with DocsWriter(filename, type_to_path=path_for_type) as docs:
docs.write_head(
- title=get_class_name(name),
+ title=snake_to_camel_case(name),
relative_css_path=paths['css'])
docs.set_menu_separator(paths['arrow'])
- build_menu(docs, filename, relative_main_index=paths['index_all'])
+ _build_menu(docs, filename, output_dir,
+ relative_main_index=paths['index_all'])
# Main file title
- docs.write_title(get_class_name(name))
+ docs.write_title(snake_to_camel_case(name))
# List available constructors for this type
docs.write_title('Available constructors', level=3)
- if not constructors:
+ if not cs:
docs.write_text('This type has no constructors available.')
- elif len(constructors) == 1:
+ elif len(cs) == 1:
docs.write_text('This type has one constructor available.')
else:
docs.write_text('This type has %d constructors available.' %
- len(constructors))
+ len(cs))
docs.begin_table(2)
- for constructor in constructors:
+ for constructor in cs:
# Constructor full name
- link = get_create_path_for(constructor)
- link = get_relative_path(link, relative_to=filename)
- docs.add_row(get_class_name(constructor), link=link)
+ link = create_path_for(constructor)
+ link = _get_relative_path(link, relative_to=filename)
+ docs.add_row(constructor.class_name, link=link)
docs.end_table()
# List all the methods which return this type
docs.write_title('Methods returning this type', level=3)
- functions = tlfunctions.get(tltype, [])
+ functions = type_to_functions.get(t, [])
if not functions:
docs.write_text('No method returns this type.')
elif len(functions) == 1:
@@ -457,16 +431,16 @@ def generate_documentation(scheme_file):
docs.begin_table(2)
for func in functions:
- link = get_create_path_for(func)
- link = get_relative_path(link, relative_to=filename)
- docs.add_row(get_class_name(func), link=link)
+ link = create_path_for(func)
+ link = _get_relative_path(link, relative_to=filename)
+ docs.add_row(func.class_name, link=link)
docs.end_table()
# List all the methods which take this type as input
docs.write_title('Methods accepting this type as input', level=3)
other_methods = sorted(
(t for t in tlobjects
- if any(tltype == a.type for a in t.args) and t.is_function),
+ if any(t == a.type for a in t.args) and t.is_function),
key=lambda t: t.name
)
if not other_methods:
@@ -482,16 +456,16 @@ def generate_documentation(scheme_file):
docs.begin_table(2)
for ot in other_methods:
- link = get_create_path_for(ot)
- link = get_relative_path(link, relative_to=filename)
- docs.add_row(get_class_name(ot), link=link)
+ link = create_path_for(ot)
+ link = _get_relative_path(link, relative_to=filename)
+ docs.add_row(ot.class_name, link=link)
docs.end_table()
# List every other type which has this type as a member
docs.write_title('Other types containing this type', level=3)
other_types = sorted(
(t for t in tlobjects
- if any(tltype == a.type for a in t.args)
+ if any(t == a.type for a in t.args)
and not t.is_function
), key=lambda t: t.name
)
@@ -509,9 +483,9 @@ def generate_documentation(scheme_file):
docs.begin_table(2)
for ot in other_types:
- link = get_create_path_for(ot)
- link = get_relative_path(link, relative_to=filename)
- docs.add_row(get_class_name(ot), link=link)
+ link = create_path_for(ot)
+ link = _get_relative_path(link, relative_to=filename)
+ docs.add_row(ot.class_name, link=link)
docs.end_table()
docs.end_body()
@@ -519,22 +493,21 @@ def generate_documentation(scheme_file):
# This will be done automatically and not taking into account any extra
# information that we have available, simply a file listing all the others
# accessible by clicking on their title
- print('Generating indices...')
for folder in ['types', 'methods', 'constructors']:
- generate_index(folder, original_paths)
+ _generate_index(os.path.join(output_dir, folder), original_paths,
+ output_dir)
# Write the final core index, the main index for the rest of files
- layer = TLParser.find_layer(scheme_file)
types = set()
methods = []
- constructors = []
+ cs = []
for tlobject in tlobjects:
if tlobject.is_function:
methods.append(tlobject)
else:
- constructors.append(tlobject)
+ cs.append(tlobject)
- if not is_core_type(tlobject.result):
+ if not tlobject.result.lower() in CORE_TYPES:
if re.search('^vector<', tlobject.result, re.IGNORECASE):
types.add(tlobject.result.split('<')[1].strip('>'))
else:
@@ -542,41 +515,43 @@ def generate_documentation(scheme_file):
types = sorted(types)
methods = sorted(methods, key=lambda m: m.name)
- constructors = sorted(constructors, key=lambda c: c.name)
+ cs = sorted(cs, key=lambda c: c.name)
- def fmt(xs):
- ys = {x: get_class_name(x) for x in xs} # cache TLObject: display
- zs = {} # create a dict to hold those which have duplicated keys
- for y in ys.values():
- zs[y] = y in zs
- return ', '.join(
- '"{}.{}"'.format(x.namespace, ys[x])
- if zs[ys[x]] and getattr(x, 'namespace', None)
- else '"{}"'.format(ys[x]) for x in xs
- )
-
- request_names = fmt(methods)
- type_names = fmt(types)
- constructor_names = fmt(constructors)
-
- def fmt(xs, formatter):
- return ', '.join('"{}"'.format(formatter(x)) for x in xs)
-
- request_urls = fmt(methods, get_create_path_for)
- type_urls = fmt(types, get_path_for_type)
- constructor_urls = fmt(constructors, get_create_path_for)
-
- shutil.copy('../res/404.html', original_paths['404'])
- copy_replace('../res/core.html', original_paths['index_all'], {
+ shutil.copy(os.path.join(input_res, '404.html'), original_paths['404'])
+ _copy_replace(os.path.join(input_res, 'core.html'),
+ original_paths['index_all'], {
'{type_count}': len(types),
'{method_count}': len(methods),
'{constructor_count}': len(tlobjects) - len(methods),
'{layer}': layer,
})
+
+ def fmt(xs):
+ zs = {} # create a dict to hold those which have duplicated keys
+ for x in xs:
+ zs[x.class_name] = x.class_name in zs
+ return ', '.join(
+ '"{}.{}"'.format(x.namespace, x.class_name)
+ if zs[x.class_name] and x.namespace
+ else '"{}"'.format(x.class_name) for x in xs
+ )
+
+ request_names = fmt(methods)
+ constructor_names = fmt(cs)
+
+ def fmt(xs, formatter):
+ return ', '.join('"{}"'.format(formatter(x)) for x in xs)
+
+ type_names = fmt(types, formatter=lambda x: x)
+ request_urls = fmt(methods, create_path_for)
+ type_urls = fmt(types, path_for_type)
+ constructor_urls = fmt(cs, create_path_for)
+
os.makedirs(os.path.abspath(os.path.join(
original_paths['search.js'], os.path.pardir
)), exist_ok=True)
- copy_replace('../res/js/search.js', original_paths['search.js'], {
+ _copy_replace(os.path.join(input_res, 'js', 'search.js'),
+ original_paths['search.js'], {
'{request_names}': request_names,
'{type_names}': type_names,
'{constructor_names}': constructor_names,
@@ -585,23 +560,16 @@ def generate_documentation(scheme_file):
'{constructor_urls}': constructor_urls
})
- # Everything done
- print('Documentation generated.')
+
+def _copy_resources(res_dir, out_dir):
+ for dirname, files in [('css', ['docs.css']), ('img', ['arrow.svg'])]:
+ dirpath = os.path.join(out_dir, dirname)
+ os.makedirs(dirpath, exist_ok=True)
+ for file in files:
+ shutil.copy(os.path.join(res_dir, dirname, file), dirpath)
-def copy_resources():
- for d in ('css', 'img'):
- os.makedirs(d, exist_ok=True)
-
- shutil.copy('../res/img/arrow.svg', 'img')
- shutil.copy('../res/css/docs.css', 'css')
-
-
-if __name__ == '__main__':
- os.makedirs('generated', exist_ok=True)
- os.chdir('generated')
- try:
- generate_documentation('../../telethon_generator/scheme.tl')
- copy_resources()
- finally:
- os.chdir(os.pardir)
+def generate_docs(tlobjects, errors, layer, input_res, output_dir):
+ os.makedirs(output_dir, exist_ok=True)
+ _write_html_pages(tlobjects, errors, layer, input_res, output_dir)
+ _copy_resources(input_res, output_dir)
diff --git a/telethon_generator/generators/errors.py b/telethon_generator/generators/errors.py
new file mode 100644
index 00000000..136809a0
--- /dev/null
+++ b/telethon_generator/generators/errors.py
@@ -0,0 +1,52 @@
+import itertools
+
+
+def generate_errors(errors, f):
+ # Exact/regex match to create {CODE: ErrorClassName}
+ exact_match = []
+ regex_match = []
+
+ # Find out what subclasses to import and which to create
+ import_base, create_base = set(), {}
+ for error in errors:
+ if error.subclass_exists:
+ import_base.add(error.subclass)
+ else:
+ create_base[error.subclass] = error.int_code
+
+ if error.has_captures:
+ regex_match.append(error)
+ else:
+ exact_match.append(error)
+
+ # Imports and new subclass creation
+ f.write('from .rpc_base_errors import RPCError, {}\n'
+ .format(", ".join(sorted(import_base))))
+
+ for cls, int_code in sorted(create_base.items(), key=lambda t: t[1]):
+ f.write('\n\nclass {}(RPCError):\n code = {}\n'
+ .format(cls, int_code))
+
+ # Error classes generation
+ for error in errors:
+ f.write('\n\nclass {}({}):\n def __init__(self, **kwargs):\n'
+ ' '.format(error.name, error.subclass))
+
+ if error.has_captures:
+ f.write("self.{} = int(kwargs.get('capture', 0))\n "
+ .format(error.capture_name))
+
+ f.write('super(Exception, self).__init__({}'
+ .format(repr(error.description)))
+
+ if error.has_captures:
+ f.write('.format(self.{})'.format(error.capture_name))
+
+ f.write(')\n')
+
+ # Create the actual {CODE: ErrorClassName} dict once classes are defined
+ # TODO Actually make a difference between regex/exact
+ f.write('\n\nrpc_errors_all = {\n')
+ for error in itertools.chain(regex_match, exact_match):
+ f.write(' {}: {},\n'.format(repr(error.pattern), error.name))
+ f.write('}\n')
diff --git a/telethon_generator/generators/tlobject.py b/telethon_generator/generators/tlobject.py
new file mode 100644
index 00000000..a2e02efb
--- /dev/null
+++ b/telethon_generator/generators/tlobject.py
@@ -0,0 +1,660 @@
+import functools
+import os
+import re
+import shutil
+import struct
+from collections import defaultdict
+from zlib import crc32
+
+from ..source_builder import SourceBuilder
+from ..utils import snake_to_camel_case
+
+AUTO_GEN_NOTICE = \
+ '"""File generated by TLObjects\' generator. All changes will be ERASED"""'
+
+
+AUTO_CASTS = {
+ 'InputPeer': 'utils.get_input_peer(client.get_input_entity({}))',
+ 'InputChannel': 'utils.get_input_channel(client.get_input_entity({}))',
+ 'InputUser': 'utils.get_input_user(client.get_input_entity({}))',
+ 'InputMedia': 'utils.get_input_media({})',
+ 'InputPhoto': 'utils.get_input_photo({})'
+}
+
+BASE_TYPES = ('string', 'bytes', 'int', 'long', 'int128',
+ 'int256', 'double', 'Bool', 'true', 'date')
+
+
+def _write_modules(out_dir, depth, namespace_tlobjects, type_constructors):
+ # namespace_tlobjects: {'namespace', [TLObject]}
+ os.makedirs(out_dir, exist_ok=True)
+ for ns, tlobjects in namespace_tlobjects.items():
+ file = os.path.join(out_dir, '{}.py'.format(ns or '__init__'))
+ with open(file, 'w', encoding='utf-8') as f,\
+ SourceBuilder(f) as builder:
+ builder.writeln(AUTO_GEN_NOTICE)
+
+ builder.writeln('from {}.tl.tlobject import TLObject', '.' * depth)
+ builder.writeln('from typing import Optional, List, '
+ 'Union, TYPE_CHECKING')
+
+ # Add the relative imports to the namespaces,
+ # unless we already are in a namespace.
+ if not ns:
+ builder.writeln('from . import {}', ', '.join(
+ x for x in namespace_tlobjects.keys() if x
+ ))
+
+ # Import 'os' for those needing access to 'os.urandom()'
+ # Currently only 'random_id' needs 'os' to be imported,
+ # for all those TLObjects with arg.can_be_inferred.
+ builder.writeln('import os')
+
+ # Import struct for the .__bytes__(self) serialization
+ builder.writeln('import struct')
+
+ tlobjects.sort(key=lambda x: x.name)
+
+ type_names = set()
+ type_defs = []
+
+ # Find all the types in this file and generate type definitions
+ # based on the types. The type definitions are written to the
+ # file at the end.
+ for t in tlobjects:
+ if not t.is_function:
+ type_name = t.result
+ if '.' in type_name:
+ type_name = type_name[type_name.rindex('.'):]
+ if type_name in type_names:
+ continue
+ type_names.add(type_name)
+ constructors = type_constructors[type_name]
+ if not constructors:
+ pass
+ elif len(constructors) == 1:
+ type_defs.append('Type{} = {}'.format(
+ type_name, constructors[0].class_name))
+ else:
+ type_defs.append('Type{} = Union[{}]'.format(
+ type_name, ','.join(c.class_name
+ for c in constructors)))
+
+ imports = {}
+ primitives = ('int', 'long', 'int128', 'int256', 'string',
+ 'date', 'bytes', 'true')
+ # Find all the types in other files that are used in this file
+ # and generate the information required to import those types.
+ for t in tlobjects:
+ for arg in t.args:
+ name = arg.type
+ if not name or name in primitives:
+ continue
+
+ import_space = '{}.tl.types'.format('.' * depth)
+ if '.' in name:
+ namespace = name.split('.')[0]
+ name = name.split('.')[1]
+ import_space += '.{}'.format(namespace)
+
+ if name not in type_names:
+ type_names.add(name)
+ if name == 'date':
+ imports['datetime'] = ['datetime']
+ continue
+ elif import_space not in imports:
+ imports[import_space] = set()
+ imports[import_space].add('Type{}'.format(name))
+
+ # Add imports required for type checking
+ if imports:
+ builder.writeln('if TYPE_CHECKING:')
+ for namespace, names in imports.items():
+ builder.writeln('from {} import {}',
+ namespace, ', '.join(names))
+
+ builder.end_block()
+
+ # Generate the class for every TLObject
+ for t in tlobjects:
+ _write_source_code(t, builder, type_constructors)
+ builder.current_indent = 0
+
+ # Write the type definitions generated earlier.
+ builder.writeln('')
+ for line in type_defs:
+ builder.writeln(line)
+
+
+def _write_source_code(tlobject, builder, type_constructors):
+ """
+ Writes the source code corresponding to the given TLObject
+ by making use of the ``builder`` `SourceBuilder`.
+
+ Additional information such as file path depth and
+ the ``Type: [Constructors]`` must be given for proper
+ importing and documentation strings.
+ """
+ _write_class_init(tlobject, type_constructors, builder)
+ _write_resolve(tlobject, builder)
+ _write_to_dict(tlobject, builder)
+ _write_to_bytes(tlobject, builder)
+ _write_from_reader(tlobject, builder)
+ _write_on_response(tlobject, builder)
+
+
+def _write_class_init(tlobject, type_constructors, builder):
+ builder.writeln()
+ builder.writeln()
+ builder.writeln('class {}(TLObject):', tlobject.class_name)
+
+ # Class-level variable to store its Telegram's constructor ID
+ builder.writeln('CONSTRUCTOR_ID = {:#x}', tlobject.id)
+ builder.writeln('SUBCLASS_OF_ID = {:#x}',
+ crc32(tlobject.result.encode('ascii')))
+ builder.writeln()
+
+ # Convert the args to string parameters, flags having =None
+ args = [(a.name if not a.is_flag and not a.can_be_inferred
+ else '{}=None'.format(a.name)) for a in tlobject.real_args]
+
+ # Write the __init__ function
+ builder.writeln('def __init__({}):', ', '.join(['self'] + args))
+ if tlobject.real_args:
+ # Write the docstring, to know the type of the args
+ builder.writeln('"""')
+ for arg in tlobject.real_args:
+ if not arg.flag_indicator:
+ builder.writeln(':param {} {}:', arg.type_hint(), arg.name)
+ builder.current_indent -= 1 # It will auto-indent (':')
+
+ # We also want to know what type this request returns
+ # or to which type this constructor belongs to
+ builder.writeln()
+ if tlobject.is_function:
+ builder.write(':returns {}: ', tlobject.result)
+ else:
+ builder.write('Constructor for {}: ', tlobject.result)
+
+ constructors = type_constructors[tlobject.result]
+ if not constructors:
+ builder.writeln('This type has no constructors.')
+ elif len(constructors) == 1:
+ builder.writeln('Instance of {}.',
+ constructors[0].class_name)
+ else:
+ builder.writeln('Instance of either {}.', ', '.join(
+ c.class_name for c in constructors))
+
+ builder.writeln('"""')
+
+ builder.writeln('super().__init__()')
+ # Functions have a result object and are confirmed by default
+ if tlobject.is_function:
+ builder.writeln('self.result = None')
+ builder.writeln('self.content_related = True')
+
+ # Set the arguments
+ if tlobject.real_args:
+ builder.writeln()
+
+ for arg in tlobject.real_args:
+ if not arg.can_be_inferred:
+ builder.writeln('self.{0} = {0} # type: {1}',
+ arg.name, arg.type_hint())
+
+ # Currently the only argument that can be
+ # inferred are those called 'random_id'
+ elif arg.name == 'random_id':
+ # Endianness doesn't really matter, and 'big' is shorter
+ code = "int.from_bytes(os.urandom({}), 'big', signed=True)" \
+ .format(8 if arg.type == 'long' else 4)
+
+ if arg.is_vector:
+ # Currently for the case of "messages.forwardMessages"
+ # Ensure we can infer the length from id:Vector<>
+ if not next(a for a in tlobject.real_args
+ if a.name == 'id').is_vector:
+ raise ValueError(
+ 'Cannot infer list of random ids for ', tlobject
+ )
+ code = '[{} for _ in range(len(id))]'.format(code)
+
+ builder.writeln(
+ "self.random_id = random_id if random_id "
+ "is not None else {}", code
+ )
+ else:
+ raise ValueError('Cannot infer a value for ', arg)
+
+ builder.end_block()
+
+
+def _write_resolve(tlobject, builder):
+ if any(arg.type in AUTO_CASTS for arg in tlobject.real_args):
+ builder.writeln('def resolve(self, client, utils):')
+ for arg in tlobject.real_args:
+ ac = AUTO_CASTS.get(arg.type, None)
+ if not ac:
+ continue
+ if arg.is_vector:
+ builder.write('self.{0} = [{1} for _x in self.{0}]',
+ arg.name, ac.format('_x'))
+ else:
+ builder.write('self.{} = {}', arg.name,
+ ac.format('self.' + arg.name))
+ builder.writeln(' if self.{} else None'.format(arg.name)
+ if arg.is_flag else '')
+ builder.end_block()
+
+
+def _write_to_dict(tlobject, builder):
+ builder.writeln('def to_dict(self):')
+ builder.writeln('return {')
+ builder.current_indent += 1
+
+ builder.write("'_': '{}'", tlobject.class_name)
+ for arg in tlobject.real_args:
+ builder.writeln(',')
+ builder.write("'{}': ", arg.name)
+ if arg.type in BASE_TYPES:
+ if arg.is_vector:
+ builder.write('[] if self.{0} is None else self.{0}[:]',
+ arg.name)
+ else:
+ builder.write('self.{}', arg.name)
+ else:
+ if arg.is_vector:
+ builder.write(
+ '[] if self.{0} is None else [None '
+ 'if x is None else x.to_dict() for x in self.{0}]',
+ arg.name
+ )
+ else:
+ builder.write(
+ 'None if self.{0} is None else self.{0}.to_dict()',
+ arg.name
+ )
+
+ builder.writeln()
+ builder.current_indent -= 1
+ builder.writeln("}")
+
+ builder.end_block()
+
+
+def _write_to_bytes(tlobject, builder):
+ builder.writeln('def __bytes__(self):')
+
+ # Some objects require more than one flag parameter to be set
+ # at the same time. In this case, add an assertion.
+ repeated_args = defaultdict(list)
+ for arg in tlobject.args:
+ if arg.is_flag:
+ repeated_args[arg.flag_index].append(arg)
+
+ for ra in repeated_args.values():
+ if len(ra) > 1:
+ cnd1 = ('(self.{0} or self.{0} is not None)'
+ .format(a.name) for a in ra)
+ cnd2 = ('(self.{0} is None or self.{0} is False)'
+ .format(a.name) for a in ra)
+ builder.writeln(
+ "assert ({}) or ({}), '{} parameters must all "
+ "be False-y (like None) or all me True-y'",
+ ' and '.join(cnd1), ' and '.join(cnd2),
+ ', '.join(a.name for a in ra)
+ )
+
+ builder.writeln("return b''.join((")
+ builder.current_indent += 1
+
+ # First constructor code, we already know its bytes
+ builder.writeln('{},', repr(struct.pack('/Vector.
+ # If this weren't the case, we should check upper case after
+ # max(index('<'), index('.')) (and if it is, it's boxed, so return).
+ m = re.match(r'Vector<(int|long)>', tlobject.result)
+ if not m:
+ return
+
+ builder.end_block()
+ builder.writeln('def on_response(self, reader):')
+ builder.writeln('reader.read_int() # Vector ID')
+ builder.writeln('self.result = [reader.read_{}() '
+ 'for _ in range(reader.read_int())]', m.group(1))
+
+
+def _write_arg_to_bytes(builder, arg, args, name=None):
+ """
+ Writes the .__bytes__() code for the given argument
+ :param builder: The source code builder
+ :param arg: The argument to write
+ :param args: All the other arguments in TLObject same __bytes__.
+ This is required to determine the flags value
+ :param name: The name of the argument. Defaults to "self.argname"
+ This argument is an option because it's required when
+ writing Vectors<>
+ """
+ if arg.generic_definition:
+ return # Do nothing, this only specifies a later type
+
+ if name is None:
+ name = 'self.{}'.format(arg.name)
+
+ # The argument may be a flag, only write if it's not None AND
+ # if it's not a True type.
+ # True types are not actually sent, but instead only used to
+ # determine the flags.
+ if arg.is_flag:
+ if arg.type == 'true':
+ return # Exit, since True type is never written
+ elif arg.is_vector:
+ # Vector flags are special since they consist of 3 values,
+ # so we need an extra join here. Note that empty vector flags
+ # should NOT be sent either!
+ builder.write("b'' if {0} is None or {0} is False "
+ "else b''.join((", name)
+ else:
+ builder.write("b'' if {0} is None or {0} is False "
+ "else (", name)
+
+ if arg.is_vector:
+ if arg.use_vector_id:
+ # vector code, unsigned 0x1cb5c415 as little endian
+ builder.write(r"b'\x15\xc4\xb5\x1c',")
+
+ builder.write("struct.pack('3.5 feature, so add another join.
+ builder.write("b''.join(")
+
+ # Temporary disable .is_vector, not to enter this if again
+ # Also disable .is_flag since it's not needed per element
+ old_flag = arg.is_flag
+ arg.is_vector = arg.is_flag = False
+ _write_arg_to_bytes(builder, arg, args, name='x')
+ arg.is_vector = True
+ arg.is_flag = old_flag
+
+ builder.write(' for x in {})', name)
+
+ elif arg.flag_indicator:
+ # Calculate the flags with those items which are not None
+ if not any(f.is_flag for f in args):
+ # There's a flag indicator, but no flag arguments so it's 0
+ builder.write(r"b'\0\0\0\0'")
+ else:
+ builder.write("struct.pack('
+ """
+
+ if arg.generic_definition:
+ return # Do nothing, this only specifies a later type
+
+ # The argument may be a flag, only write that flag was given!
+ was_flag = False
+ if arg.is_flag:
+ # Treat 'true' flags as a special case, since they're true if
+ # they're set, and nothing else needs to actually be read.
+ if 'true' == arg.type:
+ builder.writeln('{} = bool(flags & {})',
+ name, 1 << arg.flag_index)
+ return
+
+ was_flag = True
+ builder.writeln('if flags & {}:', 1 << arg.flag_index)
+ # Temporary disable .is_flag not to enter this if
+ # again when calling the method recursively
+ arg.is_flag = False
+
+ if arg.is_vector:
+ if arg.use_vector_id:
+ # We have to read the vector's constructor ID
+ builder.writeln("reader.read_int()")
+
+ builder.writeln('{} = []', name)
+ builder.writeln('for _ in range(reader.read_int()):')
+ # Temporary disable .is_vector, not to enter this if again
+ arg.is_vector = False
+ _write_arg_read_code(builder, arg, args, name='_x')
+ builder.writeln('{}.append(_x)', name)
+ arg.is_vector = True
+
+ elif arg.flag_indicator:
+ # Read the flags, which will indicate what items we should read next
+ builder.writeln('flags = reader.read_int()')
+ builder.writeln()
+
+ elif 'int' == arg.type:
+ builder.writeln('{} = reader.read_int()', name)
+
+ elif 'long' == arg.type:
+ builder.writeln('{} = reader.read_long()', name)
+
+ elif 'int128' == arg.type:
+ builder.writeln('{} = reader.read_large_int(bits=128)', name)
+
+ elif 'int256' == arg.type:
+ builder.writeln('{} = reader.read_large_int(bits=256)', name)
+
+ elif 'double' == arg.type:
+ builder.writeln('{} = reader.read_double()', name)
+
+ elif 'string' == arg.type:
+ builder.writeln('{} = reader.tgread_string()', name)
+
+ elif 'Bool' == arg.type:
+ builder.writeln('{} = reader.tgread_bool()', name)
+
+ elif 'true' == arg.type:
+ # Arbitrary not-None value, don't actually read "true" flags
+ builder.writeln('{} = True', name)
+
+ elif 'bytes' == arg.type:
+ builder.writeln('{} = reader.tgread_bytes()', name)
+
+ elif 'date' == arg.type: # Custom format
+ builder.writeln('{} = reader.tgread_date()', name)
+
+ else:
+ # Else it may be a custom type
+ if not arg.skip_constructor_id:
+ builder.writeln('{} = reader.tgread_object()', name)
+ else:
+ # Import the correct type inline to avoid cyclic imports.
+ # There may be better solutions so that we can just access
+ # all the types before the files have been parsed, but I
+ # don't know of any.
+ sep_index = arg.type.find('.')
+ if sep_index == -1:
+ ns, t = '.', arg.type
+ else:
+ ns, t = '.' + arg.type[:sep_index], arg.type[sep_index+1:]
+ class_name = snake_to_camel_case(t)
+
+ # There would be no need to import the type if we're in the
+ # file with the same namespace, but since it does no harm
+ # and we don't have information about such thing in the
+ # method we just ignore that case.
+ builder.writeln('from {} import {}', ns, class_name)
+ builder.writeln('{} = {}.from_reader(reader)',
+ name, class_name)
+
+ # End vector and flag blocks if required (if we opened them before)
+ if arg.is_vector:
+ builder.end_block()
+
+ if was_flag:
+ builder.current_indent -= 1
+ builder.writeln('else:')
+ builder.writeln('{} = None', name)
+ builder.current_indent -= 1
+ # Restore .is_flag
+ arg.is_flag = True
+
+
+def _write_all_tlobjects(tlobjects, layer, builder):
+ builder.writeln(AUTO_GEN_NOTICE)
+ builder.writeln()
+
+ builder.writeln('from . import types, functions')
+ builder.writeln()
+
+ # Create a constant variable to indicate which layer this is
+ builder.writeln('LAYER = {}', layer)
+ builder.writeln()
+
+ # Then create the dictionary containing constructor_id: class
+ builder.writeln('tlobjects = {')
+ builder.current_indent += 1
+
+ # Fill the dictionary (0x1a2b3c4f: tl.full.type.path.Class)
+ for tlobject in tlobjects:
+ builder.write('{:#010x}: ', tlobject.id)
+ builder.write('functions' if tlobject.is_function else 'types')
+ if tlobject.namespace:
+ builder.write('.' + tlobject.namespace)
+
+ builder.writeln('.{},', tlobject.class_name)
+
+ builder.current_indent -= 1
+ builder.writeln('}')
+
+
+def generate_tlobjects(tlobjects, layer, import_depth, output_dir):
+ get_file = functools.partial(os.path.join, output_dir)
+ os.makedirs(get_file('functions'), exist_ok=True)
+ os.makedirs(get_file('types'), exist_ok=True)
+
+ # Group everything by {namespace: [tlobjects]} to generate __init__.py
+ namespace_functions = defaultdict(list)
+ namespace_types = defaultdict(list)
+
+ # Group {type: [constructors]} to generate the documentation
+ type_constructors = defaultdict(list)
+ for tlobject in tlobjects:
+ if tlobject.is_function:
+ namespace_functions[tlobject.namespace].append(tlobject)
+ else:
+ namespace_types[tlobject.namespace].append(tlobject)
+ type_constructors[tlobject.result].append(tlobject)
+
+ _write_modules(get_file('functions'), import_depth,
+ namespace_functions, type_constructors)
+ _write_modules(get_file('types'), import_depth,
+ namespace_types, type_constructors)
+
+ filename = os.path.join(get_file('all_tlobjects.py'))
+ with open(filename, 'w', encoding='utf-8') as file:
+ with SourceBuilder(file) as builder:
+ _write_all_tlobjects(tlobjects, layer, builder)
+
+
+def clean_tlobjects(output_dir):
+ get_file = functools.partial(os.path.join, output_dir)
+ for d in ('functions', 'types'):
+ d = get_file(d)
+ if os.path.isdir(d):
+ shutil.rmtree(d)
+
+ tl = get_file('all_tlobjects.py')
+ if os.path.isfile(tl):
+ os.remove(tl)
diff --git a/telethon_generator/parser/__init__.py b/telethon_generator/parser/__init__.py
deleted file mode 100644
index 6f1a2a9d..00000000
--- a/telethon_generator/parser/__init__.py
+++ /dev/null
@@ -1,3 +0,0 @@
-from .source_builder import SourceBuilder
-from .tl_parser import TLParser
-from .tl_object import TLObject
diff --git a/telethon_generator/parser/tl_object.py b/telethon_generator/parser/tl_object.py
deleted file mode 100644
index 0e0045d7..00000000
--- a/telethon_generator/parser/tl_object.py
+++ /dev/null
@@ -1,323 +0,0 @@
-import re
-from zlib import crc32
-
-
-class TLObject:
- """.tl core types IDs (such as vector, booleans, etc.)"""
- CORE_TYPES = (
- 0xbc799737, # boolFalse#bc799737 = Bool;
- 0x997275b5, # boolTrue#997275b5 = Bool;
- 0x3fedd339, # true#3fedd339 = True;
- 0x1cb5c415, # vector#1cb5c415 {t:Type} # [ t ] = Vector t;
- )
-
- def __init__(self, fullname, object_id, args, result, is_function):
- """
- Initializes a new TLObject, given its properties.
- Usually, this will be called from `from_tl` instead
- :param fullname: The fullname of the TL object (namespace.name)
- The namespace can be omitted
- :param object_id: The hexadecimal string representing the object ID
- :param args: The arguments, if any, of the TL object
- :param result: The result type of the TL object
- :param is_function: Is the object a function or a type?
- """
- # The name can or not have a namespace
- if '.' in fullname:
- self.namespace = fullname.split('.')[0]
- self.name = fullname.split('.')[1]
- else:
- self.namespace = None
- self.name = fullname
-
- self.args = args
- self.result = result
- self.is_function = is_function
-
- # The ID should be an hexadecimal string or None to be inferred
- if object_id is None:
- self.id = self.infer_id()
- else:
- self.id = int(object_id, base=16)
- assert self.id == self.infer_id(),\
- 'Invalid inferred ID for ' + repr(self)
-
- @staticmethod
- def from_tl(tl, is_function):
- """Returns a TL object from the given TL scheme line"""
-
- # Regex to match the whole line
- match = re.match(r'''
- ^ # We want to match from the beginning to the end
- ([\w.]+) # The .tl object can contain alpha_name or namespace.alpha_name
- (?:
- \# # After the name, comes the ID of the object
- ([0-9a-f]+) # The constructor ID is in hexadecimal form
- )? # If no constructor ID was given, CRC32 the 'tl' to determine it
-
- (?:\s # After that, we want to match its arguments (name:type)
- {? # For handling the start of the '{X:Type}' case
- \w+ # The argument name will always be an alpha-only name
- : # Then comes the separator between name:type
- [\w\d<>#.?!]+ # The type is slightly more complex, since it's alphanumeric and it can
- # also have Vector, flags:# and flags.0?default, plus :!X as type
- }? # For handling the end of the '{X:Type}' case
- )* # Match 0 or more arguments
- \s # Leave a space between the arguments and the equal
- =
- \s # Leave another space between the equal and the result
- ([\w\d<>#.?]+) # The result can again be as complex as any argument type
- ;$ # Finally, the line should always end with ;
- ''', tl, re.IGNORECASE | re.VERBOSE)
-
- if match is None:
- # Probably "vector#1cb5c415 {t:Type} # [ t ] = Vector t;"
- raise ValueError('Cannot parse TLObject', tl)
-
- # Sub-regex to match the arguments (sadly, it cannot be embedded in the first regex)
- args_match = re.findall(r'''
- ({)? # We may or may not capture the opening brace
- (\w+) # First we capture any alpha name with length 1 or more
- : # Which is separated from its type by a colon
- ([\w\d<>#.?!]+) # The type is slightly more complex, since it's alphanumeric and it can
- # also have Vector, flags:# and flags.0?default, plus :!X as type
- (})? # We may or not capture the closing brace
- ''', tl, re.IGNORECASE | re.VERBOSE)
-
- # Retrieve the matched arguments
- args = [TLArg(name, arg_type, brace != '')
- for brace, name, arg_type, _ in args_match]
-
- # And initialize the TLObject
- return TLObject(
- fullname=match.group(1),
- object_id=match.group(2),
- args=args,
- result=match.group(3),
- is_function=is_function)
-
- def class_name(self):
- """Gets the class name following the Python style guidelines"""
- return self.class_name_for(self.name, self.is_function)
-
- @staticmethod
- def class_name_for(typename, is_function=False):
- """Gets the class name following the Python style guidelines"""
- # Courtesy of http://stackoverflow.com/a/31531797/4759433
- result = re.sub(r'_([a-z])', lambda m: m.group(1).upper(), typename)
- result = result[:1].upper() + result[1:].replace('_', '')
- # If it's a function, let it end with "Request" to identify them
- if is_function:
- result += 'Request'
- return result
-
- def sorted_args(self):
- """Returns the arguments properly sorted and ready to plug-in
- into a Python's method header (i.e., flags and those which
- can be inferred will go last so they can default =None)
- """
- return sorted(self.args,
- key=lambda x: x.is_flag or x.can_be_inferred)
-
- def is_core_type(self):
- """Determines whether the TLObject is a "core type"
- (and thus should be embedded in the generated code) or not"""
- return self.id in TLObject.CORE_TYPES
-
- def __repr__(self, ignore_id=False):
- fullname = ('{}.{}'.format(self.namespace, self.name)
- if self.namespace is not None else self.name)
-
- if getattr(self, 'id', None) is None or ignore_id:
- hex_id = ''
- else:
- # Skip 0x and add 0's for padding
- hex_id = '#' + hex(self.id)[2:].rjust(8, '0')
-
- if self.args:
- args = ' ' + ' '.join([repr(arg) for arg in self.args])
- else:
- args = ''
-
- return '{}{}{} = {}'.format(fullname, hex_id, args, self.result)
-
- def infer_id(self):
- representation = self.__repr__(ignore_id=True)
-
- # Clean the representation
- representation = representation\
- .replace(':bytes ', ':string ')\
- .replace('?bytes ', '?string ')\
- .replace('<', ' ').replace('>', '')\
- .replace('{', '').replace('}', '')
-
- representation = re.sub(
- r' \w+:flags\.\d+\?true',
- r'',
- representation
- )
- return crc32(representation.encode('ascii'))
-
- def __str__(self):
- fullname = ('{}.{}'.format(self.namespace, self.name)
- if self.namespace is not None else self.name)
-
- # Some arguments are not valid for being represented, such as the flag indicator or generic definition
- # (these have no explicit values until used)
- valid_args = [arg for arg in self.args
- if not arg.flag_indicator and not arg.generic_definition]
-
- args = ', '.join(['{}={{}}'.format(arg.name) for arg in valid_args])
-
- # Since Python's default representation for lists is using repr(), we need to str() manually on every item
- args_format = ', '.join(
- ['str(self.{})'.format(arg.name) if not arg.is_vector else
- 'None if not self.{0} else [str(_) for _ in self.{0}]'.format(
- arg.name) for arg in valid_args])
-
- return ("'({} (ID: {}) = ({}))'.format({})"
- .format(fullname, hex(self.id), args, args_format))
-
-
-class TLArg:
- def __init__(self, name, arg_type, generic_definition):
- """
- Initializes a new .tl argument
- :param name: The name of the .tl argument
- :param arg_type: The type of the .tl argument
- :param generic_definition: Is the argument a generic definition?
- (i.e. {X:Type})
- """
- if name == 'self': # This very only name is restricted
- self.name = 'is_self'
- else:
- self.name = name
-
- # Default values
- self.is_vector = False
- self.is_flag = False
- self.skip_constructor_id = False
- self.flag_index = -1
-
- # Special case: some types can be inferred, which makes it
- # less annoying to type. Currently the only type that can
- # be inferred is if the name is 'random_id', to which a
- # random ID will be assigned if left as None (the default)
- self.can_be_inferred = name == 'random_id'
-
- # The type can be an indicator that other arguments will be flags
- if arg_type == '#':
- self.flag_indicator = True
- self.type = None
- self.is_generic = False
- else:
- self.flag_indicator = False
- self.is_generic = arg_type.startswith('!')
- # Strip the exclamation mark always to have only the name
- self.type = arg_type.lstrip('!')
-
- # The type may be a flag (flags.IDX?REAL_TYPE)
- # Note that 'flags' is NOT the flags name; this is determined by a previous argument
- # However, we assume that the argument will always be called 'flags'
- flag_match = re.match(r'flags.(\d+)\?([\w<>.]+)', self.type)
- if flag_match:
- self.is_flag = True
- self.flag_index = int(flag_match.group(1))
- # Update the type to match the exact type, not the "flagged" one
- self.type = flag_match.group(2)
-
- # Then check if the type is a Vector
- vector_match = re.match(r'[Vv]ector<([\w\d.]+)>', self.type)
- if vector_match:
- self.is_vector = True
-
- # If the type's first letter is not uppercase, then
- # it is a constructor and we use (read/write) its ID
- # as pinpointed on issue #81.
- self.use_vector_id = self.type[0] == 'V'
-
- # Update the type to match the one inside the vector
- self.type = vector_match.group(1)
-
- # See use_vector_id. An example of such case is ipPort in
- # help.configSpecial
- if self.type.split('.')[-1][0].islower():
- self.skip_constructor_id = True
-
- # The name may contain "date" in it, if this is the case and the type is "int",
- # we can safely assume that this should be treated as a "date" object.
- # Note that this is not a valid Telegram object, but it's easier to work with
- if self.type == 'int' and (
- re.search(r'(\b|_)date\b', name) or
- name in ('expires', 'expires_at', 'was_online')):
- self.type = 'date'
-
- self.generic_definition = generic_definition
-
- def doc_type_hint(self):
- result = {
- 'int': 'int',
- 'long': 'int',
- 'int128': 'int',
- 'int256': 'int',
- 'string': 'str',
- 'date': 'datetime.datetime | None', # None date = 0 timestamp
- 'bytes': 'bytes',
- 'true': 'bool',
- }.get(self.type, self.type)
- if self.is_vector:
- result = 'list[{}]'.format(result)
- if self.is_flag and self.type != 'date':
- result += ' | None'
-
- return result
-
- def python_type_hint(self):
- type = self.type
- if '.' in type:
- type = type.split('.')[1]
- result = {
- 'int': 'int',
- 'long': 'int',
- 'int128': 'int',
- 'int256': 'int',
- 'string': 'str',
- 'date': 'Optional[datetime]', # None date = 0 timestamp
- 'bytes': 'bytes',
- 'true': 'bool',
- }.get(type, "Type{}".format(type))
- if self.is_vector:
- result = 'List[{}]'.format(result)
- if self.is_flag and type != 'date':
- result = 'Optional[{}]'.format(result)
-
- return result
-
- def __str__(self):
- # Find the real type representation by updating it as required
- real_type = self.type
- if self.flag_indicator:
- real_type = '#'
-
- if self.is_vector:
- if self.use_vector_id:
- real_type = 'Vector<{}>'.format(real_type)
- else:
- real_type = 'vector<{}>'.format(real_type)
-
- if self.is_generic:
- real_type = '!{}'.format(real_type)
-
- if self.is_flag:
- real_type = 'flags.{}?{}'.format(self.flag_index, real_type)
-
- if self.generic_definition:
- return '{{{}:{}}}'.format(self.name, real_type)
- else:
- return '{}:{}'.format(self.name, real_type)
-
- def __repr__(self):
- # Get rid of our special type
- return str(self)\
- .replace(':date', ':int')\
- .replace('?date', '?int')
diff --git a/telethon_generator/parser/tl_parser.py b/telethon_generator/parser/tl_parser.py
deleted file mode 100644
index 8c24cbf4..00000000
--- a/telethon_generator/parser/tl_parser.py
+++ /dev/null
@@ -1,51 +0,0 @@
-import re
-
-from .tl_object import TLObject
-
-
-class TLParser:
- """Class used to parse .tl files"""
-
- @staticmethod
- def parse_file(file_path, ignore_core=False):
- """This method yields TLObjects from a given .tl file"""
-
- with open(file_path, encoding='utf-8') as file:
- # Start by assuming that the next found line won't
- # be a function (and will hence be a type)
- is_function = False
-
- # Read all the lines from the .tl file
- for line in file:
- # Strip comments from the line
- comment_index = line.find('//')
- if comment_index != -1:
- line = line[:comment_index]
-
- line = line.strip()
- if line:
- # Check whether the line is a type change
- # (types <-> functions) or not
- match = re.match('---(\w+)---', line)
- if match:
- following_types = match.group(1)
- is_function = following_types == 'functions'
-
- else:
- try:
- result = TLObject.from_tl(line, is_function)
- if not ignore_core or not result.is_core_type():
- yield result
- except ValueError as e:
- if 'vector#1cb5c415' not in str(e):
- raise
-
- @staticmethod
- def find_layer(file_path):
- """Finds the layer used on the specified scheme.tl file"""
- layer_regex = re.compile(r'^//\s*LAYER\s*(\d+)$')
- with open(file_path, encoding='utf-8') as file:
- for line in file:
- match = layer_regex.match(line)
- if match:
- return int(match.group(1))
diff --git a/telethon_generator/parsers/__init__.py b/telethon_generator/parsers/__init__.py
new file mode 100644
index 00000000..9034450e
--- /dev/null
+++ b/telethon_generator/parsers/__init__.py
@@ -0,0 +1,2 @@
+from .errors import Error, parse_errors
+from .tlobject import TLObject, parse_tl, find_layer
diff --git a/telethon_generator/parsers/errors.py b/telethon_generator/parsers/errors.py
new file mode 100644
index 00000000..d5e53eda
--- /dev/null
+++ b/telethon_generator/parsers/errors.py
@@ -0,0 +1,146 @@
+import json
+import re
+from collections import defaultdict
+
+from ..utils import snake_to_camel_case
+
+# Core base classes depending on the integer error code
+KNOWN_BASE_CLASSES = {
+ 303: 'InvalidDCError',
+ 400: 'BadRequestError',
+ 401: 'UnauthorizedError',
+ 403: 'ForbiddenError',
+ 404: 'NotFoundError',
+ 406: 'AuthKeyError',
+ 420: 'FloodError',
+ 500: 'ServerError',
+}
+
+# The API doesn't return the code for some (vital) errors. They are
+# all assumed to be 400, except these well-known ones that aren't.
+KNOWN_CODES = {
+ 'ACTIVE_USER_REQUIRED': 401,
+ 'AUTH_KEY_UNREGISTERED': 401,
+ 'USER_DEACTIVATED': 401
+}
+
+# Give better semantic names to some captures
+CAPTURE_NAMES = {
+ 'FloodWaitError': 'seconds',
+ 'FloodTestPhoneWaitError': 'seconds',
+ 'FileMigrateError': 'new_dc',
+ 'NetworkMigrateError': 'new_dc',
+ 'PhoneMigrateError': 'new_dc',
+ 'UserMigrateError': 'new_dc',
+ 'FilePartMissingError': 'which'
+}
+
+
+def _get_class_name(error_code):
+ """
+ Gets the corresponding class name for the given error code,
+ this either being an integer (thus base error name) or str.
+ """
+ if isinstance(error_code, int):
+ return KNOWN_BASE_CLASSES.get(
+ error_code, 'RPCError' + str(error_code).replace('-', 'Neg')
+ )
+
+ return snake_to_camel_case(
+ error_code.replace('FIRSTNAME', 'FIRST_NAME').lower(), suffix='Error')
+
+
+class Error:
+ def __init__(self, int_code, str_code, description, caused_by):
+ # TODO Some errors have the same str_code but different int_code
+ # Should these be split into different files or doesn't really matter?
+ # Telegram isn't exactly consistent with returned errors anyway.
+ self.int_code = int_code
+ self.str_code = str_code
+ self.subclass = _get_class_name(int_code)
+ self.subclass_exists = int_code in KNOWN_BASE_CLASSES
+ self.description = description
+ self.caused_by = list(sorted(caused_by))
+
+ self.has_captures = '_X' in str_code
+ if self.has_captures:
+ self.name = _get_class_name(str_code.replace('_X', ''))
+ self.pattern = str_code.replace('_X', r'_(\d+)')
+ self.capture_name = CAPTURE_NAMES.get(self.name, 'x')
+ else:
+ self.name = _get_class_name(str_code)
+ self.pattern = str_code
+ self.capture_name = None
+
+
+def parse_errors(json_file, descriptions_file):
+ """
+ Parses the given JSON file in the following format:
+ {
+ "ok": true,
+ "human_result": {"int_code": ["descriptions"]},
+ "result": {"int_code": {"full_method_name": ["str_error"]}}
+ }
+
+ The descriptions file, which has precedence over the JSON's human_result,
+ should have the following format:
+ # comment
+ str_error=Description
+
+ The method yields `Error` instances as a result.
+ """
+ with open(json_file, encoding='utf-8') as f:
+ data = json.load(f)
+
+ errors = defaultdict(set)
+ error_to_method = defaultdict(set)
+ # PWRTelegram's API doesn't return all errors, which we do need here.
+ # Add some special known-cases manually first.
+ errors[420].update((
+ 'FLOOD_WAIT_X', 'FLOOD_TEST_PHONE_WAIT_X'
+ ))
+ errors[401].update((
+ 'AUTH_KEY_INVALID', 'SESSION_EXPIRED', 'SESSION_REVOKED'
+ ))
+ errors[303].update((
+ 'FILE_MIGRATE_X', 'PHONE_MIGRATE_X',
+ 'NETWORK_MIGRATE_X', 'USER_MIGRATE_X'
+ ))
+ for int_code, method_errors in data['result'].items():
+ for method, error_list in method_errors.items():
+ for error in error_list:
+ error = re.sub('_\d+', '_X', error).upper()
+ errors[int(int_code)].add(error)
+ error_to_method[error].add(method)
+
+ # Some errors are in the human result, but not with a code. Assume 400
+ for error in data['human_result']:
+ if error[0] != '-' and not error.isdigit():
+ error = re.sub('_\d+', '_X', error).upper()
+ if not any(error in es for es in errors.values()):
+ errors[KNOWN_CODES.get(error, 400)].add(error)
+
+ # Prefer the descriptions that are related with Telethon way of coding
+ # to those that PWRTelegram's API provides.
+ telethon_descriptions = {}
+ with open(descriptions_file, encoding='utf-8') as f:
+ for line in f:
+ line = line.strip()
+ if line and not line.startswith('#'):
+ equal = line.index('=')
+ message, description = line[:equal], line[equal + 1:]
+ telethon_descriptions[message.rstrip()] = description.lstrip()
+
+ for int_code, error_set in errors.items():
+ for str_code in sorted(error_set):
+ description = telethon_descriptions.get(
+ str_code, '\n'.join(data['human_result'].get(
+ str_code, ['No description known']
+ ))
+ )
+ yield Error(
+ int_code=int_code,
+ str_code=str_code,
+ description=description,
+ caused_by=error_to_method[str_code]
+ )
diff --git a/telethon_generator/parsers/tlobject.py b/telethon_generator/parsers/tlobject.py
new file mode 100644
index 00000000..a5e5945a
--- /dev/null
+++ b/telethon_generator/parsers/tlobject.py
@@ -0,0 +1,274 @@
+import re
+from zlib import crc32
+
+from ..utils import snake_to_camel_case
+
+CORE_TYPES = (
+ 0xbc799737, # boolFalse#bc799737 = Bool;
+ 0x997275b5, # boolTrue#997275b5 = Bool;
+ 0x3fedd339, # true#3fedd339 = True;
+ 0x1cb5c415, # vector#1cb5c415 {t:Type} # [ t ] = Vector t;
+)
+
+
+class TLObject:
+ def __init__(self, fullname, object_id, args, result, is_function):
+ """
+ Initializes a new TLObject, given its properties.
+
+ :param fullname: The fullname of the TL object (namespace.name)
+ The namespace can be omitted.
+ :param object_id: The hexadecimal string representing the object ID
+ :param args: The arguments, if any, of the TL object
+ :param result: The result type of the TL object
+ :param is_function: Is the object a function or a type?
+ """
+ # The name can or not have a namespace
+ self.fullname = fullname
+ if '.' in fullname:
+ self.namespace, self.name = fullname.split('.', maxsplit=1)
+ else:
+ self.namespace, self.name = None, fullname
+
+ self.args = args
+ self.result = result
+ self.is_function = is_function
+ self.id = None
+ if object_id is None:
+ self.id = self.infer_id()
+ else:
+ self.id = int(object_id, base=16)
+ assert self.id == self.infer_id(),\
+ 'Invalid inferred ID for ' + repr(self)
+
+ self.class_name = snake_to_camel_case(
+ self.name, suffix='Request' if self.is_function else '')
+
+ self.real_args = list(a for a in self.sorted_args() if not
+ (a.flag_indicator or a.generic_definition))
+
+ def sorted_args(self):
+ """Returns the arguments properly sorted and ready to plug-in
+ into a Python's method header (i.e., flags and those which
+ can be inferred will go last so they can default =None)
+ """
+ return sorted(self.args,
+ key=lambda x: x.is_flag or x.can_be_inferred)
+
+ def __repr__(self, ignore_id=False):
+ if self.id is None or ignore_id:
+ hex_id = ''
+ else:
+ hex_id = '#{:08x}'.format(self.id)
+
+ if self.args:
+ args = ' ' + ' '.join([repr(arg) for arg in self.args])
+ else:
+ args = ''
+
+ return '{}{}{} = {}'.format(self.fullname, hex_id, args, self.result)
+
+ def infer_id(self):
+ representation = self.__repr__(ignore_id=True)
+ representation = representation\
+ .replace(':bytes ', ':string ')\
+ .replace('?bytes ', '?string ')\
+ .replace('<', ' ').replace('>', '')\
+ .replace('{', '').replace('}', '')
+
+ representation = re.sub(
+ r' \w+:flags\.\d+\?true',
+ r'',
+ representation
+ )
+ return crc32(representation.encode('ascii'))
+
+
+class TLArg:
+ def __init__(self, name, arg_type, generic_definition):
+ """
+ Initializes a new .tl argument
+ :param name: The name of the .tl argument
+ :param arg_type: The type of the .tl argument
+ :param generic_definition: Is the argument a generic definition?
+ (i.e. {X:Type})
+ """
+ self.name = 'is_self' if name == 'self' else name
+
+ # Default values
+ self.is_vector = False
+ self.is_flag = False
+ self.skip_constructor_id = False
+ self.flag_index = -1
+
+ # Special case: some types can be inferred, which makes it
+ # less annoying to type. Currently the only type that can
+ # be inferred is if the name is 'random_id', to which a
+ # random ID will be assigned if left as None (the default)
+ self.can_be_inferred = name == 'random_id'
+
+ # The type can be an indicator that other arguments will be flags
+ if arg_type == '#':
+ self.flag_indicator = True
+ self.type = None
+ self.is_generic = False
+ else:
+ self.flag_indicator = False
+ self.is_generic = arg_type.startswith('!')
+ # Strip the exclamation mark always to have only the name
+ self.type = arg_type.lstrip('!')
+
+ # The type may be a flag (flags.IDX?REAL_TYPE)
+ # Note that 'flags' is NOT the flags name; this
+ # is determined by a previous argument
+ # However, we assume that the argument will always be called 'flags'
+ flag_match = re.match(r'flags.(\d+)\?([\w<>.]+)', self.type)
+ if flag_match:
+ self.is_flag = True
+ self.flag_index = int(flag_match.group(1))
+ # Update the type to match the exact type, not the "flagged" one
+ self.type = flag_match.group(2)
+
+ # Then check if the type is a Vector
+ vector_match = re.match(r'[Vv]ector<([\w\d.]+)>', self.type)
+ if vector_match:
+ self.is_vector = True
+
+ # If the type's first letter is not uppercase, then
+ # it is a constructor and we use (read/write) its ID
+ # as pinpointed on issue #81.
+ self.use_vector_id = self.type[0] == 'V'
+
+ # Update the type to match the one inside the vector
+ self.type = vector_match.group(1)
+
+ # See use_vector_id. An example of such case is ipPort in
+ # help.configSpecial
+ if self.type.split('.')[-1][0].islower():
+ self.skip_constructor_id = True
+
+ # The name may contain "date" in it, if this is the case and the type is "int",
+ # we can safely assume that this should be treated as a "date" object.
+ # Note that this is not a valid Telegram object, but it's easier to work with
+ if self.type == 'int' and (
+ re.search(r'(\b|_)date\b', name) or
+ name in ('expires', 'expires_at', 'was_online')):
+ self.type = 'date'
+
+ self.generic_definition = generic_definition
+
+ def type_hint(self):
+ type = self.type
+ if '.' in type:
+ type = type.split('.')[1]
+ result = {
+ 'int': 'int',
+ 'long': 'int',
+ 'int128': 'int',
+ 'int256': 'int',
+ 'string': 'str',
+ 'date': 'Optional[datetime]', # None date = 0 timestamp
+ 'bytes': 'bytes',
+ 'true': 'bool',
+ }.get(type, "Type{}".format(type))
+ if self.is_vector:
+ result = 'List[{}]'.format(result)
+ if self.is_flag and type != 'date':
+ result = 'Optional[{}]'.format(result)
+
+ return result
+
+ def __str__(self):
+ # Find the real type representation by updating it as required
+ real_type = self.type
+ if self.flag_indicator:
+ real_type = '#'
+
+ if self.is_vector:
+ if self.use_vector_id:
+ real_type = 'Vector<{}>'.format(real_type)
+ else:
+ real_type = 'vector<{}>'.format(real_type)
+
+ if self.is_generic:
+ real_type = '!{}'.format(real_type)
+
+ if self.is_flag:
+ real_type = 'flags.{}?{}'.format(self.flag_index, real_type)
+
+ if self.generic_definition:
+ return '{{{}:{}}}'.format(self.name, real_type)
+ else:
+ return '{}:{}'.format(self.name, real_type)
+
+ def __repr__(self):
+ return str(self).replace(':date', ':int').replace('?date', '?int')
+
+
+def _from_line(line, is_function):
+ match = re.match(
+ r'^([\w.]+)' # 'name'
+ r'(?:#([0-9a-fA-F]+))?' # '#optionalcode'
+ r'(?:\s{?\w+:[\w\d<>#.?!]+}?)*' # '{args:.0?type}'
+ r'\s=\s' # ' = '
+ r'([\w\d<>#.?]+);$', # ';'
+ line
+ )
+ if match is None:
+ # Probably "vector#1cb5c415 {t:Type} # [ t ] = Vector t;"
+ raise ValueError('Cannot parse TLObject {}'.format(line))
+
+ args_match = re.findall(
+ r'({)?'
+ r'(\w+)'
+ r':'
+ r'([\w\d<>#.?!]+)'
+ r'}?',
+ line
+ )
+ return TLObject(
+ fullname=match.group(1),
+ object_id=match.group(2),
+ result=match.group(3),
+ is_function=is_function,
+ args=[TLArg(name, arg_type, brace != '')
+ for brace, name, arg_type in args_match]
+ )
+
+
+def parse_tl(file_path, ignore_core=False):
+ """This method yields TLObjects from a given .tl file."""
+ with open(file_path, encoding='utf-8') as file:
+ is_function = False
+ for line in file:
+ comment_index = line.find('//')
+ if comment_index != -1:
+ line = line[:comment_index]
+
+ line = line.strip()
+ if not line:
+ continue
+
+ match = re.match('---(\w+)---', line)
+ if match:
+ following_types = match.group(1)
+ is_function = following_types == 'functions'
+ continue
+
+ try:
+ result = _from_line(line, is_function)
+ if not ignore_core or result.id not in CORE_TYPES:
+ yield result
+ except ValueError as e:
+ if 'vector#1cb5c415' not in str(e):
+ raise
+
+
+def find_layer(file_path):
+ """Finds the layer used on the specified scheme.tl file."""
+ layer_regex = re.compile(r'^//\s*LAYER\s*(\d+)$')
+ with open(file_path, encoding='utf-8') as file:
+ for line in file:
+ match = layer_regex.match(line)
+ if match:
+ return int(match.group(1))
diff --git a/telethon_generator/parser/source_builder.py b/telethon_generator/source_builder.py
similarity index 100%
rename from telethon_generator/parser/source_builder.py
rename to telethon_generator/source_builder.py
diff --git a/telethon_generator/tl_generator.py b/telethon_generator/tl_generator.py
deleted file mode 100644
index 85ee98a0..00000000
--- a/telethon_generator/tl_generator.py
+++ /dev/null
@@ -1,762 +0,0 @@
-import os
-import re
-import shutil
-import struct
-from zlib import crc32
-from collections import defaultdict
-
-from .parser import SourceBuilder, TLParser, TLObject
-AUTO_GEN_NOTICE = \
- '"""File generated by TLObjects\' generator. All changes will be ERASED"""'
-
-
-AUTO_CASTS = {
- 'InputPeer': 'utils.get_input_peer(client.get_input_entity({}))',
- 'InputChannel': 'utils.get_input_channel(client.get_input_entity({}))',
- 'InputUser': 'utils.get_input_user(client.get_input_entity({}))',
- 'InputMedia': 'utils.get_input_media({})',
- 'InputPhoto': 'utils.get_input_photo({})'
-}
-
-
-class TLGenerator:
- def __init__(self, output_dir):
- self.output_dir = output_dir
-
- def _get_file(self, *paths):
- """Wrapper around ``os.path.join()`` with output as first path."""
- return os.path.join(self.output_dir, *paths)
-
- def _rm_if_exists(self, filename):
- """Recursively deletes the given filename if it exists."""
- file = self._get_file(filename)
- if os.path.exists(file):
- if os.path.isdir(file):
- shutil.rmtree(file)
- else:
- os.remove(file)
-
- def tlobjects_exist(self):
- """
- Determines whether the TLObjects were previously
- generated (hence exist) or not.
- """
- return os.path.isfile(self._get_file('all_tlobjects.py'))
-
- def clean_tlobjects(self):
- """Cleans the automatically generated TLObjects from disk."""
- for name in ('functions', 'types', 'all_tlobjects.py'):
- self._rm_if_exists(name)
-
- def generate_tlobjects(self, scheme_file, import_depth):
- """
- Generates all the TLObjects from the ``scheme_file`` to
- ``tl/functions`` and ``tl/types``.
- """
-
- # First ensure that the required parent directories exist
- os.makedirs(self._get_file('functions'), exist_ok=True)
- os.makedirs(self._get_file('types'), exist_ok=True)
-
- # Step 0: Cache the parsed file on a tuple
- tlobjects = tuple(TLParser.parse_file(scheme_file, ignore_core=True))
-
- # Step 1: Group everything by {namespace: [tlobjects]} so we can
- # easily generate __init__.py files with all the TLObjects on them.
- namespace_functions = defaultdict(list)
- namespace_types = defaultdict(list)
-
- # Make use of this iteration to also store 'Type: [Constructors]',
- # used when generating the documentation for the classes.
- type_constructors = defaultdict(list)
- for tlobject in tlobjects:
- if tlobject.is_function:
- namespace_functions[tlobject.namespace].append(tlobject)
- else:
- namespace_types[tlobject.namespace].append(tlobject)
- type_constructors[tlobject.result].append(tlobject)
-
- # Step 2: Generate the actual code
- self._write_init_py(
- self._get_file('functions'), import_depth,
- namespace_functions, type_constructors
- )
- self._write_init_py(
- self._get_file('types'), import_depth,
- namespace_types, type_constructors
- )
-
- # Step 4: Once all the objects have been generated,
- # we can now group them in a single file
- filename = os.path.join(self._get_file('all_tlobjects.py'))
- with open(filename, 'w', encoding='utf-8') as file,\
- SourceBuilder(file) as builder:
- builder.writeln(AUTO_GEN_NOTICE)
- builder.writeln()
-
- builder.writeln('from . import types, functions')
- builder.writeln()
-
- # Create a constant variable to indicate which layer this is
- builder.writeln('LAYER = {}', TLParser.find_layer(scheme_file))
- builder.writeln()
-
- # Then create the dictionary containing constructor_id: class
- builder.writeln('tlobjects = {')
- builder.current_indent += 1
-
- # Fill the dictionary (0x1a2b3c4f: tl.full.type.path.Class)
- for tlobject in tlobjects:
- builder.write('{:#010x}: ', tlobject.id)
- builder.write('functions' if tlobject.is_function else 'types')
- if tlobject.namespace:
- builder.write('.' + tlobject.namespace)
-
- builder.writeln('.{},', tlobject.class_name())
-
- builder.current_indent -= 1
- builder.writeln('}')
-
- @staticmethod
- def _write_init_py(out_dir, depth, namespace_tlobjects, type_constructors):
- # namespace_tlobjects: {'namespace', [TLObject]}
- os.makedirs(out_dir, exist_ok=True)
- for ns, tlobjects in namespace_tlobjects.items():
- file = os.path.join(out_dir, ns + '.py' if ns else '__init__.py')
- with open(file, 'w', encoding='utf-8') as f, \
- SourceBuilder(f) as builder:
- builder.writeln(AUTO_GEN_NOTICE)
-
- # Both types and functions inherit from the TLObject class
- # so they all can be serialized and sent, however, only the
- # functions are "content_related".
- builder.writeln(
- 'from {}.tl.tlobject import TLObject', '.' * depth
- )
- builder.writeln('from typing import Optional, List, '
- 'Union, TYPE_CHECKING')
-
- # Add the relative imports to the namespaces,
- # unless we already are in a namespace.
- if not ns:
- builder.writeln('from . import {}', ', '.join(
- x for x in namespace_tlobjects.keys() if x
- ))
-
- # Import 'os' for those needing access to 'os.urandom()'
- # Currently only 'random_id' needs 'os' to be imported,
- # for all those TLObjects with arg.can_be_inferred.
- builder.writeln('import os')
-
- # Import struct for the .__bytes__(self) serialization
- builder.writeln('import struct')
-
- tlobjects.sort(key=lambda x: x.name)
-
- type_names = set()
- type_defs = []
-
- # Find all the types in this file and generate type definitions
- # based on the types. The type definitions are written to the
- # file at the end.
- for t in tlobjects:
- if not t.is_function:
- type_name = t.result
- if '.' in type_name:
- type_name = type_name[type_name.rindex('.'):]
- if type_name in type_names:
- continue
- type_names.add(type_name)
- constructors = type_constructors[type_name]
- if not constructors:
- pass
- elif len(constructors) == 1:
- type_defs.append('Type{} = {}'.format(
- type_name, constructors[0].class_name()))
- else:
- type_defs.append('Type{} = Union[{}]'.format(
- type_name, ','.join(c.class_name()
- for c in constructors)))
-
- imports = {}
- primitives = ('int', 'long', 'int128', 'int256', 'string',
- 'date', 'bytes', 'true')
- # Find all the types in other files that are used in this file
- # and generate the information required to import those types.
- for t in tlobjects:
- for arg in t.args:
- name = arg.type
- if not name or name in primitives:
- continue
-
- import_space = '{}.tl.types'.format('.' * depth)
- if '.' in name:
- namespace = name.split('.')[0]
- name = name.split('.')[1]
- import_space += '.{}'.format(namespace)
-
- if name not in type_names:
- type_names.add(name)
- if name == 'date':
- imports['datetime'] = ['datetime']
- continue
- elif import_space not in imports:
- imports[import_space] = set()
- imports[import_space].add('Type{}'.format(name))
-
- # Add imports required for type checking
- if imports:
- builder.writeln('if TYPE_CHECKING:')
- for namespace, names in imports.items():
- builder.writeln('from {} import {}',
- namespace, ', '.join(names))
-
- builder.end_block()
-
- # Generate the class for every TLObject
- for t in tlobjects:
- TLGenerator._write_source_code(
- t, builder, depth, type_constructors
- )
- builder.current_indent = 0
-
- # Write the type definitions generated earlier.
- builder.writeln('')
- for line in type_defs:
- builder.writeln(line)
-
- @staticmethod
- def _write_source_code(tlobject, builder, depth, type_constructors):
- """
- Writes the source code corresponding to the given TLObject
- by making use of the ``builder`` `SourceBuilder`.
-
- Additional information such as file path depth and
- the ``Type: [Constructors]`` must be given for proper
- importing and documentation strings.
- """
- builder.writeln()
- builder.writeln()
- builder.writeln('class {}(TLObject):', tlobject.class_name())
-
- # Class-level variable to store its Telegram's constructor ID
- builder.writeln('CONSTRUCTOR_ID = {:#x}', tlobject.id)
- builder.writeln('SUBCLASS_OF_ID = {:#x}',
- crc32(tlobject.result.encode('ascii')))
- builder.writeln()
-
- # Flag arguments must go last
- args = [
- a for a in tlobject.sorted_args()
- if not a.flag_indicator and not a.generic_definition
- ]
-
- # Convert the args to string parameters, flags having =None
- args = [
- (a.name if not a.is_flag and not a.can_be_inferred
- else '{}=None'.format(a.name))
- for a in args
- ]
-
- # Write the __init__ function
- if args:
- builder.writeln('def __init__(self, {}):', ', '.join(args))
- else:
- builder.writeln('def __init__(self):')
-
- # Now update args to have the TLObject arguments, _except_
- # those which are calculated on send or ignored, this is
- # flag indicator and generic definitions.
- #
- # We don't need the generic definitions in Python
- # because arguments can be any type
- args = [arg for arg in tlobject.args
- if not arg.flag_indicator and
- not arg.generic_definition]
-
- if args:
- # Write the docstring, to know the type of the args
- builder.writeln('"""')
- for arg in args:
- if not arg.flag_indicator:
- builder.writeln(':param {} {}:',
- arg.doc_type_hint(), arg.name)
- builder.current_indent -= 1 # It will auto-indent (':')
-
- # We also want to know what type this request returns
- # or to which type this constructor belongs to
- builder.writeln()
- if tlobject.is_function:
- builder.write(':returns {}: ', tlobject.result)
- else:
- builder.write('Constructor for {}: ', tlobject.result)
-
- constructors = type_constructors[tlobject.result]
- if not constructors:
- builder.writeln('This type has no constructors.')
- elif len(constructors) == 1:
- builder.writeln('Instance of {}.',
- constructors[0].class_name())
- else:
- builder.writeln('Instance of either {}.', ', '.join(
- c.class_name() for c in constructors))
-
- builder.writeln('"""')
-
- builder.writeln('super().__init__()')
- # Functions have a result object and are confirmed by default
- if tlobject.is_function:
- builder.writeln('self.result = None')
- builder.writeln(
- 'self.content_related = True')
-
- # Set the arguments
- if args:
- # Leave an empty line if there are any args
- builder.writeln()
-
- for arg in args:
- if not arg.can_be_inferred:
- builder.writeln('self.{0} = {0} # type: {1}',
- arg.name, arg.python_type_hint())
- continue
-
- # Currently the only argument that can be
- # inferred are those called 'random_id'
- if arg.name == 'random_id':
- # Endianness doesn't really matter, and 'big' is shorter
- code = "int.from_bytes(os.urandom({}), 'big', signed=True)" \
- .format(8 if arg.type == 'long' else 4)
-
- if arg.is_vector:
- # Currently for the case of "messages.forwardMessages"
- # Ensure we can infer the length from id:Vector<>
- if not next(
- a for a in args if a.name == 'id').is_vector:
- raise ValueError(
- 'Cannot infer list of random ids for ', tlobject
- )
- code = '[{} for _ in range(len(id))]'.format(code)
-
- builder.writeln(
- "self.random_id = random_id if random_id "
- "is not None else {}", code
- )
- else:
- raise ValueError('Cannot infer a value for ', arg)
-
- builder.end_block()
-
- # Write the resolve(self, client, utils) method
- if any(arg.type in AUTO_CASTS for arg in args):
- builder.writeln('def resolve(self, client, utils):')
- for arg in args:
- ac = AUTO_CASTS.get(arg.type, None)
- if ac:
- TLGenerator._write_self_assign(builder, arg, ac)
- builder.end_block()
-
- # Write the to_dict(self) method
- builder.writeln('def to_dict(self):')
- builder.writeln('return {')
- builder.current_indent += 1
-
- base_types = ('string', 'bytes', 'int', 'long', 'int128',
- 'int256', 'double', 'Bool', 'true', 'date')
-
- builder.write("'_': '{}'", tlobject.class_name())
- for arg in args:
- builder.writeln(',')
- builder.write("'{}': ", arg.name)
- if arg.type in base_types:
- if arg.is_vector:
- builder.write('[] if self.{0} is None else self.{0}[:]',
- arg.name)
- else:
- builder.write('self.{}', arg.name)
- else:
- if arg.is_vector:
- builder.write(
- '[] if self.{0} is None else [None '
- 'if x is None else x.to_dict() for x in self.{0}]',
- arg.name
- )
- else:
- builder.write(
- 'None if self.{0} is None else self.{0}.to_dict()',
- arg.name
- )
-
- builder.writeln()
- builder.current_indent -= 1
- builder.writeln("}")
-
- builder.end_block()
-
- # Write the .__bytes__() function
- builder.writeln('def __bytes__(self):')
-
- # Some objects require more than one flag parameter to be set
- # at the same time. In this case, add an assertion.
- repeated_args = defaultdict(list)
- for arg in tlobject.args:
- if arg.is_flag:
- repeated_args[arg.flag_index].append(arg)
-
- for ra in repeated_args.values():
- if len(ra) > 1:
- cnd1 = ('(self.{0} or self.{0} is not None)'
- .format(a.name) for a in ra)
- cnd2 = ('(self.{0} is None or self.{0} is False)'
- .format(a.name) for a in ra)
- builder.writeln(
- "assert ({}) or ({}), '{} parameters must all "
- "be False-y (like None) or all me True-y'",
- ' and '.join(cnd1), ' and '.join(cnd2),
- ', '.join(a.name for a in ra)
- )
-
- builder.writeln("return b''.join((")
- builder.current_indent += 1
-
- # First constructor code, we already know its bytes
- builder.writeln('{},', repr(struct.pack('
- # or a namespace, and the Vector may have a not-boxed type. For this
- # reason we find whatever index, '<' or '.'. If neither are present
- # we will get -1, and the 0th char is always upper case thus works.
- # For Vector types and namespaces, it will check in the right place.
- check_after = max(type_.find('<'), type_.find('.'))
- return type_[check_after + 1].isupper()
-
- @staticmethod
- def _write_self_assign(builder, arg, get_input_code):
- """Writes self.arg = input.format(self.arg), considering vectors."""
- if arg.is_vector:
- builder.write('self.{0} = [{1} for _x in self.{0}]',
- arg.name, get_input_code.format('_x'))
- else:
- builder.write('self.{} = {}',
- arg.name, get_input_code.format('self.' + arg.name))
-
- builder.writeln(
- ' if self.{} else None'.format(arg.name) if arg.is_flag else ''
- )
-
- @staticmethod
- def get_file_name(tlobject, add_extension=False):
- """Gets the file name in file_name_format.py for the given TLObject"""
-
- # Courtesy of http://stackoverflow.com/a/1176023/4759433
- s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', tlobject.name)
- result = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
- if add_extension:
- return result + '.py'
- else:
- return result
-
- @staticmethod
- def write_to_bytes(builder, arg, args, name=None):
- """
- Writes the .__bytes__() code for the given argument
- :param builder: The source code builder
- :param arg: The argument to write
- :param args: All the other arguments in TLObject same __bytes__.
- This is required to determine the flags value
- :param name: The name of the argument. Defaults to "self.argname"
- This argument is an option because it's required when
- writing Vectors<>
- """
- if arg.generic_definition:
- return # Do nothing, this only specifies a later type
-
- if name is None:
- name = 'self.{}'.format(arg.name)
-
- # The argument may be a flag, only write if it's not None AND
- # if it's not a True type.
- # True types are not actually sent, but instead only used to
- # determine the flags.
- if arg.is_flag:
- if arg.type == 'true':
- return # Exit, since True type is never written
- elif arg.is_vector:
- # Vector flags are special since they consist of 3 values,
- # so we need an extra join here. Note that empty vector flags
- # should NOT be sent either!
- builder.write("b'' if {0} is None or {0} is False "
- "else b''.join((", name)
- else:
- builder.write("b'' if {0} is None or {0} is False "
- "else (", name)
-
- if arg.is_vector:
- if arg.use_vector_id:
- # vector code, unsigned 0x1cb5c415 as little endian
- builder.write(r"b'\x15\xc4\xb5\x1c',")
-
- builder.write("struct.pack('3.5 feature, so add another join.
- builder.write("b''.join(")
-
- # Temporary disable .is_vector, not to enter this if again
- # Also disable .is_flag since it's not needed per element
- old_flag = arg.is_flag
- arg.is_vector = arg.is_flag = False
- TLGenerator.write_to_bytes(builder, arg, args, name='x')
- arg.is_vector = True
- arg.is_flag = old_flag
-
- builder.write(' for x in {})', name)
-
- elif arg.flag_indicator:
- # Calculate the flags with those items which are not None
- if not any(f.is_flag for f in args):
- # There's a flag indicator, but no flag arguments so it's 0
- builder.write(r"b'\0\0\0\0'")
- else:
- builder.write("struct.pack('
- """
-
- if arg.generic_definition:
- return # Do nothing, this only specifies a later type
-
- # The argument may be a flag, only write that flag was given!
- was_flag = False
- if arg.is_flag:
- # Treat 'true' flags as a special case, since they're true if
- # they're set, and nothing else needs to actually be read.
- if 'true' == arg.type:
- builder.writeln('{} = bool(flags & {})',
- name, 1 << arg.flag_index)
- return
-
- was_flag = True
- builder.writeln('if flags & {}:', 1 << arg.flag_index)
- # Temporary disable .is_flag not to enter this if
- # again when calling the method recursively
- arg.is_flag = False
-
- if arg.is_vector:
- if arg.use_vector_id:
- # We have to read the vector's constructor ID
- builder.writeln("reader.read_int()")
-
- builder.writeln('{} = []', name)
- builder.writeln('for _ in range(reader.read_int()):')
- # Temporary disable .is_vector, not to enter this if again
- arg.is_vector = False
- TLGenerator.write_read_code(builder, arg, args, name='_x')
- builder.writeln('{}.append(_x)', name)
- arg.is_vector = True
-
- elif arg.flag_indicator:
- # Read the flags, which will indicate what items we should read next
- builder.writeln('flags = reader.read_int()')
- builder.writeln()
-
- elif 'int' == arg.type:
- builder.writeln('{} = reader.read_int()', name)
-
- elif 'long' == arg.type:
- builder.writeln('{} = reader.read_long()', name)
-
- elif 'int128' == arg.type:
- builder.writeln('{} = reader.read_large_int(bits=128)', name)
-
- elif 'int256' == arg.type:
- builder.writeln('{} = reader.read_large_int(bits=256)', name)
-
- elif 'double' == arg.type:
- builder.writeln('{} = reader.read_double()', name)
-
- elif 'string' == arg.type:
- builder.writeln('{} = reader.tgread_string()', name)
-
- elif 'Bool' == arg.type:
- builder.writeln('{} = reader.tgread_bool()', name)
-
- elif 'true' == arg.type:
- # Arbitrary not-None value, don't actually read "true" flags
- builder.writeln('{} = True', name)
-
- elif 'bytes' == arg.type:
- builder.writeln('{} = reader.tgread_bytes()', name)
-
- elif 'date' == arg.type: # Custom format
- builder.writeln('{} = reader.tgread_date()', name)
-
- else:
- # Else it may be a custom type
- if not arg.skip_constructor_id:
- builder.writeln('{} = reader.tgread_object()', name)
- else:
- # Import the correct type inline to avoid cyclic imports.
- # There may be better solutions so that we can just access
- # all the types before the files have been parsed, but I
- # don't know of any.
- sep_index = arg.type.find('.')
- if sep_index == -1:
- ns, t = '.', arg.type
- else:
- ns, t = '.' + arg.type[:sep_index], arg.type[sep_index+1:]
- class_name = TLObject.class_name_for(t)
-
- # There would be no need to import the type if we're in the
- # file with the same namespace, but since it does no harm
- # and we don't have information about such thing in the
- # method we just ignore that case.
- builder.writeln('from {} import {}', ns, class_name)
- builder.writeln('{} = {}.from_reader(reader)',
- name, class_name)
-
- # End vector and flag blocks if required (if we opened them before)
- if arg.is_vector:
- builder.end_block()
-
- if was_flag:
- builder.current_indent -= 1
- builder.writeln('else:')
- builder.writeln('{} = None', name)
- builder.current_indent -= 1
- # Restore .is_flag
- arg.is_flag = True
-
- @staticmethod
- def write_request_result_code(builder, tlobject):
- """
- Writes the receive code for the given function
-
- :param builder: The source code builder
- :param tlobject: The TLObject for which the 'self.result = '
- will be written
- """
- if tlobject.result.startswith('Vector<'):
- # Vector results are a bit special since they can also be composed
- # of integer values and such; however, the result of requests is
- # not parsed as arguments are and it's a bit harder to tell which
- # is which.
- if tlobject.result == 'Vector':
- builder.writeln('reader.read_int() # Vector ID')
- builder.writeln('count = reader.read_int()')
- builder.writeln(
- 'self.result = [reader.read_int() for _ in range(count)]'
- )
- elif tlobject.result == 'Vector':
- builder.writeln('reader.read_int() # Vector ID')
- builder.writeln('count = reader.read_long()')
- builder.writeln(
- 'self.result = [reader.read_long() for _ in range(count)]'
- )
- else:
- builder.writeln('self.result = reader.tgread_vector()')
- else:
- builder.writeln('self.result = reader.tgread_object()')
diff --git a/telethon_generator/utils.py b/telethon_generator/utils.py
new file mode 100644
index 00000000..9889803f
--- /dev/null
+++ b/telethon_generator/utils.py
@@ -0,0 +1,8 @@
+import re
+
+
+def snake_to_camel_case(name, suffix=None):
+ # Courtesy of http://stackoverflow.com/a/31531797/4759433
+ result = re.sub(r'_([a-z])', lambda m: m.group(1).upper(), name)
+ result = result[:1].upper() + result[1:].replace('_', '')
+ return result + suffix if suffix else result