Telethon/telethon_generator/tl_generator.py
Lonami b68772aab5 Fixed JSON encoding and decoding for lists, added empty() method
The new empty() method retrieves an empty instance of the
given object, with all the attributes set to None
2016-09-25 10:50:48 +02:00

465 lines
21 KiB
Python
Executable File

import os
import re
import shutil
from parser import SourceBuilder, TLParser
def get_output_path(normal_path):
return os.path.join('../telethon/tl', normal_path)
class TLGenerator:
@staticmethod
def tlobjects_exist():
"""Determines whether the TLObjects were previously generated (hence exist) or not"""
return os.path.isfile(get_output_path('all_tlobjects.py'))
@staticmethod
def clean_tlobjects():
"""Cleans the automatically generated TLObjects from disk"""
if os.path.isdir(get_output_path('functions')):
shutil.rmtree(get_output_path('functions'))
if os.path.isdir(get_output_path('types')):
shutil.rmtree(get_output_path('types'))
if os.path.isfile(get_output_path('all_tlobjects.py')):
os.remove(get_output_path('all_tlobjects.py'))
@staticmethod
def generate_tlobjects(scheme_file):
"""Generates all the TLObjects from scheme.tl to tl/functions and tl/types"""
# First ensure that the required parent directories exist
os.makedirs(get_output_path('functions'), exist_ok=True)
os.makedirs(get_output_path('types'), exist_ok=True)
# Store the parsed file in a tuple for iterating it more than once
tlobjects = tuple(TLParser.parse_file(scheme_file))
for tlobject in tlobjects:
# Determine the output directory and create it
out_dir = get_output_path('functions' if tlobject.is_function
else 'types')
if tlobject.namespace:
out_dir = os.path.join(out_dir, tlobject.namespace)
os.makedirs(out_dir, exist_ok=True)
# Also add this object to __init__.py, so we can import the whole packet at once
init_py = os.path.join(out_dir, '__init__.py')
with open(init_py, 'a', encoding='utf-8') as file:
with SourceBuilder(file) as builder:
builder.writeln('from {} import {}'.format(
TLGenerator.get_full_file_name(tlobject),
TLGenerator.get_class_name(tlobject)))
# Create the file for this TLObject
filename = os.path.join(out_dir, TLGenerator.get_file_name(tlobject, add_extension=True))
with open(filename, 'w', encoding='utf-8') as file:
# Let's build the source code!
with SourceBuilder(file) as builder:
# Both types and functions inherit from MTProtoRequest so they all can be sent
builder.writeln('from telethon.tl.mtproto_request import MTProtoRequest')
builder.writeln('import json')
builder.writeln()
builder.writeln()
builder.writeln('class {}(MTProtoRequest):'.format(TLGenerator.get_class_name(tlobject)))
# Write the original .tl definition, along with a "generated automatically" message
builder.writeln('"""Class generated by TLObjects\' generator. '
'All changes will be ERASED. Original .tl definition below.')
builder.writeln('{}"""'.format(repr(tlobject)))
builder.writeln()
# First sort the arguments so that those not being a flag come first
args = sorted([arg for arg in tlobject.args if not arg.flag_indicator],
key=lambda x: x.is_flag)
# Then convert the args to string parameters, the flags having =None
args = [(arg.name if not arg.is_flag
else '{}=None'.format(arg.name)) for arg in args
if not arg.flag_indicator and not arg.generic_definition]
# Write the __init__ function
if args:
builder.writeln('def __init__(self, {}):'.format(', '.join(args)))
else:
builder.writeln('def __init__(self):')
# Now update args to have the TLObject arguments, _except_
# those which are generated automatically: flag indicator and generic definitions.
# We don't need the generic definitions in Python because arguments can be any type
args = [arg for arg in tlobject.args
if not arg.flag_indicator and not arg.generic_definition]
if args:
# Write the docstring, so we know the type of the arguments
builder.writeln('"""')
for arg in args:
if not arg.flag_indicator:
builder.write(':param {}: Telegram type: «{}».'.format(arg.name, arg.type))
if arg.is_vector:
builder.write(' Must be a list.'.format(arg.name))
if arg.is_generic:
builder.write(' This should be another MTProtoRequest.')
builder.writeln()
builder.writeln('"""')
builder.writeln('super().__init__()')
# Functions have a result object and are confirmed by default
if tlobject.is_function:
builder.writeln('self.result = None')
builder.writeln('self.confirmed = True # Confirmed by default')
# Create an attribute that stores the TLObject's constructor ID
builder.writeln('self.constructor_id = {}'.format(hex(tlobject.id)))
# Set the arguments
if args:
# Leave an empty line if there are any args
builder.writeln()
for arg in args:
builder.writeln('self.{0} = {0}'.format(arg.name))
builder.end_block()
# Write the on_send(self, writer) function
builder.writeln('def on_send(self, writer):')
builder.writeln('writer.write_int(self.constructor_id, signed=False)'
.format(hex(tlobject.id), tlobject.name))
for arg in tlobject.args:
TLGenerator.write_onsend_code(builder, arg, tlobject.args)
builder.end_block()
# Generate the empty() function, which returns an "empty"
# instance, in which all attributes are set to None
builder.writeln('@staticmethod')
builder.writeln('def empty():')
builder.writeln('"""Returns an "empty" instance (all attributes are None)"""')
builder.writeln('return {}({})'.format(
TLGenerator.get_class_name(tlobject),
', '.join('None' for _ in range(len(args)))
))
builder.end_block()
# Write the on_response(self, reader) function
builder.writeln('def on_response(self, reader):')
# Do not read constructor's ID, since that's already been read somewhere else
if tlobject.is_function:
builder.writeln('self.result = reader.tgread_object()')
else:
if tlobject.args:
for arg in tlobject.args:
TLGenerator.write_onresponse_code(builder, arg, tlobject.args)
else:
# If there were no arguments, we still need an on_response method, and hence "pass" if empty
builder.writeln('pass')
builder.end_block()
# Write the __repr__(self) and __str__(self) functions
builder.writeln('def __repr__(self):')
builder.writeln("return '{}'".format(repr(tlobject)))
builder.end_block()
builder.writeln('def __str__(self):')
builder.writeln("return {}".format(str(tlobject)))
builder.end_block()
# Write JSON encoding
builder.writeln('def json_encode(self):')
builder.writeln('return json.dumps({')
builder.current_indent += 1
for arg in args:
if TLGenerator.is_tlobject(arg.type):
if arg.is_vector:
builder.writeln("'{0}': [{0}_item.json_encode() "
"for {0}_item in self.{0}],".format(arg.name))
else:
builder.writeln("'{0}': self.{0}.json_encode(),".format(arg.name))
else:
builder.writeln("'{0}': self.{0},".format(arg.name))
builder.current_indent -= 1
builder.writeln('})')
builder.end_block()
# Write JSON decoding
builder.writeln('@staticmethod')
builder.writeln('def json_decode(json_string):')
builder.writeln('# Create an empty instance which will be filled with the JSON values')
builder.writeln('instance = {}.empty()'.format(TLGenerator.get_class_name(tlobject)))
builder.writeln('dictionary = json.loads(json_string)')
builder.writeln()
for arg in args:
if TLGenerator.is_tlobject(arg.type):
if arg.is_vector:
builder.writeln("instance.{0} = [{0}_item.json_decode() "
"for {0}_item in dictionary['{0}']]".format(arg.name))
else:
builder.writeln("instance.{0} = dictionary['{0}'].json_decode() if '{0}' in dictionary "
"and dictionary['{0}'] is not None else None".format(arg.name))
else:
builder.writeln("instance.{0} = dictionary.get('{0}', None)".format(arg.name))
builder.writeln()
builder.writeln('return instance')
# builder.end_block() # There is no need to end the last block
# Once all the objects have been generated, we can now group them in a single file
filename = os.path.join(get_output_path('all_tlobjects.py'))
with open(filename, 'w', encoding='utf-8') as file:
with SourceBuilder(file) as builder:
builder.writeln('"""File generated by TLObjects\' generator. All changes will be ERASED"""')
builder.writeln()
# First add imports
for tlobject in tlobjects:
builder.writeln('import {}'.format(TLGenerator.get_full_file_name(tlobject)))
builder.writeln()
# Create a variable to indicate which layer this is
builder.writeln('layer = {} # Current generated layer'.format(TLParser.find_layer(scheme_file)))
builder.writeln()
# Then create the dictionary containing constructor_id: class
builder.writeln('tlobjects = {')
builder.current_indent += 1
# Fill the dictionary (0x1a2b3c4f: tl.full.type.path.Class)
for tlobject in tlobjects:
builder.writeln('{}: {}.{},'
.format(hex(tlobject.id),
TLGenerator.get_full_file_name(tlobject),
TLGenerator.get_class_name(tlobject)))
builder.current_indent -= 1
builder.writeln('}')
@staticmethod
def get_class_name(tlobject):
"""Gets the class name following the Python style guidelines, in ThisClassFormat"""
# Courtesy of http://stackoverflow.com/a/31531797/4759433
# Also, '_' could be replaced for ' ', then use .title(), and then remove ' '
result = re.sub(r'_([a-z])', lambda m: m.group(1).upper(), tlobject.name)
result = result[:1].upper() + result[1:].replace('_', '') # Replace again to fully ensure!
# If it's a function, let it end with "Request" to identify them more easily
if tlobject.is_function:
result += 'Request'
return result
@staticmethod
def get_full_file_name(tlobject):
"""Gets the full file name for the given TLObject (tl.type.full.path)"""
fullname = TLGenerator.get_file_name(tlobject, add_extension=False)
if tlobject.namespace:
fullname = '{}.{}'.format(tlobject.namespace, fullname)
if tlobject.is_function:
return 'telethon.tl.functions.{}'.format(fullname)
else:
return 'telethon.tl.types.{}'.format(fullname)
@staticmethod
def get_file_name(tlobject, add_extension):
"""Gets the file name in file_name_format.py for the given TLObject"""
# Courtesy of http://stackoverflow.com/a/1176023/4759433
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', tlobject.name)
result = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
if add_extension:
return result + '.py'
else:
return result
@staticmethod
def is_tlobject(type):
"""Determines whether the type is a "basic" type or another TLObject"""
return type not in ['int', 'long', 'int128', 'int256', 'double',
'string', 'Bool', 'true', 'bytes', 'date']
@staticmethod
def write_onsend_code(builder, arg, args, name=None):
"""
Writes the write code for the given argument
:param builder: The source code builder
:param arg: The argument to write
:param args: All the other arguments in TLObject same on_send. This is required to determine the flags value
:param name: The name of the argument. Defaults to «self.argname»
This argument is an option because it's required when writing Vectors<>
"""
if arg.generic_definition:
return # Do nothing, this only specifies a later type
if name is None:
name = 'self.{}'.format(arg.name)
# The argument may be a flag, only write if it's not None AND if it's not a True type
# True types are not actually sent, but instead only used to determine the flags
if arg.is_flag:
if arg.type == 'true':
return # Exit, since True type is never written
else:
builder.writeln('if {}:'.format(name))
if arg.is_vector:
builder.writeln("writer.write_int(0x1cb5c415, signed=False) # Vector's constructor ID")
builder.writeln('writer.write_int(len({}))'.format(name))
builder.writeln('for {}_item in {}:'.format(arg.name, name))
# Temporary disable .is_vector, not to enter this if again
arg.is_vector = False
TLGenerator.write_onsend_code(builder, arg, args, name='{}_item'.format(arg.name))
arg.is_vector = True
elif arg.flag_indicator:
# Calculate the flags with those items which are not None
builder.writeln('# Calculate the flags. This equals to those flag arguments which are NOT None')
builder.writeln('flags = 0')
for flag in args:
if flag.is_flag:
builder.writeln('flags |= (1 << {}) if {} else 0'
.format(flag.flag_index, 'self.{}'.format(flag.name)))
builder.writeln('writer.write_int(flags)')
builder.writeln()
elif 'int' == arg.type:
builder.writeln('writer.write_int({})'.format(name))
elif 'long' == arg.type:
builder.writeln('writer.write_long({})'.format(name))
elif 'int128' == arg.type:
builder.writeln('writer.write_large_int({}, bits=128)'.format(name))
elif 'int256' == arg.type:
builder.writeln('writer.write_large_int({}, bits=256)'.format(name))
elif 'double' == arg.type:
builder.writeln('writer.write_double({})'.format(name))
elif 'string' == arg.type:
builder.writeln('writer.tgwrite_string({})'.format(name))
elif 'Bool' == arg.type:
builder.writeln('writer.tgwrite_bool({})'.format(name))
elif 'true' == arg.type: # Awkwardly enough, Telegram has both bool and "true", used in flags
pass # These are actually NOT written! Only used for flags
elif 'bytes' == arg.type:
builder.writeln('writer.tgwrite_bytes({})'.format(name))
elif 'date' == arg.type: # Custom format
builder.writeln('writer.tgwrite_date({})'.format(name))
else:
# Else it may be a custom type
builder.writeln('{}.on_send(writer)'.format(name))
# End vector and flag blocks if required (if we opened them before)
if arg.is_vector:
builder.end_block()
if arg.is_flag:
builder.end_block()
@staticmethod
def write_onresponse_code(builder, arg, args, name=None):
"""
Writes the receive code for the given argument
:param builder: The source code builder
:param arg: The argument to write
:param args: All the other arguments in TLObject same on_send. This is required to determine the flags value
:param name: The name of the argument. Defaults to «self.argname»
This argument is an option because it's required when writing Vectors<>
"""
if arg.generic_definition:
return # Do nothing, this only specifies a later type
if name is None:
name = 'self.{}'.format(arg.name)
# The argument may be a flag, only write that flag was given!
was_flag = False
if arg.is_flag:
was_flag = True
builder.writeln('if (flags & (1 << {})) != 0:'.format(arg.flag_index))
# Temporary disable .is_flag not to enter this if again when calling the method recursively
arg.is_flag = False
if arg.is_vector:
builder.writeln("reader.read_int() # Vector's constructor ID")
builder.writeln('{} = [] # Initialize an empty list'.format(name))
builder.writeln('{}_len = reader.read_int()'.format(arg.name))
builder.writeln('for _ in range({}_len):'.format(arg.name))
# Temporary disable .is_vector, not to enter this if again
arg.is_vector = False
TLGenerator.write_onresponse_code(builder, arg, args, name='{}_item'.format(arg.name))
builder.writeln('{}.append({}_item)'.format(name, arg.name))
arg.is_vector = True
elif arg.flag_indicator:
# Read the flags, which will indicate what items we should read next
builder.writeln('flags = reader.read_int()')
builder.writeln()
elif 'int' == arg.type:
builder.writeln('{} = reader.read_int()'.format(name))
elif 'long' == arg.type:
builder.writeln('{} = reader.read_long()'.format(name))
elif 'int128' == arg.type:
builder.writeln('{} = reader.read_large_int(bits=128)'.format(name))
elif 'int256' == arg.type:
builder.writeln('{} = reader.read_large_int(bits=256)'.format(name))
elif 'double' == arg.type:
builder.writeln('{} = reader.read_double()'.format(name))
elif 'string' == arg.type:
builder.writeln('{} = reader.tgread_string()'.format(name))
elif 'Bool' == arg.type:
builder.writeln('{} = reader.tgread_bool()'.format(name))
elif 'true' == arg.type: # Awkwardly enough, Telegram has both bool and "true", used in flags
builder.writeln('{} = True # Arbitrary not-None value, no need to read since it is a flag'.format(name))
elif 'bytes' == arg.type:
builder.writeln('{} = reader.tgread_bytes()'.format(name))
elif 'date' == arg.type: # Custom format
builder.writeln('{} = reader.tgread_date()'.format(name))
else:
# Else it may be a custom type
builder.writeln('{} = reader.tgread_object()'.format(name))
# End vector and flag blocks if required (if we opened them before)
if arg.is_vector:
builder.end_block()
if was_flag:
builder.end_block()
# Restore .is_flag
arg.is_flag = True
if __name__ == '__main__':
if TLGenerator.tlobjects_exist():
print('Detected previous TLObjects. Cleaning...')
TLGenerator.clean_tlobjects()
print('Generating TLObjects...')
TLGenerator.generate_tlobjects('scheme.tl')
print('Done.')