Make raw API types immutable

This commit is contained in:
Lonami Exo
2022-01-26 12:14:17 +01:00
parent d426099bf5
commit 070af28e85
5 changed files with 90 additions and 75 deletions

View File

@@ -85,6 +85,9 @@ def _write_modules(
# Import struct for the .__bytes__(self) serialization
builder.writeln('import struct')
# Import dataclasses in order to freeze the instances
builder.writeln('import dataclasses')
# Import datetime for type hinting
builder.writeln('from datetime import datetime')
@@ -187,37 +190,9 @@ def _write_source_code(tlobject, kind, builder, type_constructors):
def _write_class_init(tlobject, kind, type_constructors, builder):
builder.writeln()
builder.writeln()
builder.writeln('@dataclasses.dataclass(init=False, frozen=True)')
builder.writeln('class {}({}):', tlobject.class_name, kind)
# Define slots to help reduce the size of the objects a little bit.
# It's also good for knowing what fields an object has.
builder.write('__slots__ = (')
sep = ''
for arg in tlobject.real_args:
builder.write('{}{!r},', sep, arg.name)
sep = ' '
builder.writeln(')')
# Class-level variable to store its Telegram's constructor ID
builder.writeln('CONSTRUCTOR_ID = {:#x}', tlobject.id)
builder.writeln('SUBCLASS_OF_ID = {:#x}',
crc32(tlobject.result.encode('ascii')))
builder.writeln()
# Convert the args to string parameters, flags having =None
args = ['{}: {}{}'.format(
a.name, a.type_hint(), '=None' if a.is_flag or a.can_be_inferred else '')
for a in tlobject.real_args
]
# Write the __init__ function if it has any argument
if not tlobject.real_args:
return
if any(a.name in dir(builtins) for a in tlobject.real_args):
builder.writeln('# noinspection PyShadowingBuiltins')
builder.writeln("def __init__({}):", ', '.join(['self'] + args))
builder.writeln('"""')
if tlobject.is_function:
builder.write(':returns {}: ', tlobject.result)
@@ -236,47 +211,83 @@ def _write_class_init(tlobject, kind, type_constructors, builder):
builder.writeln('"""')
# Set the arguments
# Define slots to help reduce the size of the objects a little bit.
# It's also good for knowing what fields an object has.
builder.write('__slots__ = (')
sep = ''
for arg in tlobject.real_args:
if not arg.can_be_inferred:
builder.writeln('self.{0} = {0}', arg.name)
builder.write('{}{!r},', sep, arg.name)
sep = ' '
builder.writeln(')')
# Currently the only argument that can be
# inferred are those called 'random_id'
elif arg.name == 'random_id':
# Endianness doesn't really matter, and 'big' is shorter
code = "int.from_bytes(os.urandom({}), 'big', signed=True)" \
.format(8 if arg.type == 'long' else 4)
# Class-level variable to store its Telegram's constructor ID
builder.writeln('CONSTRUCTOR_ID = {:#x}', tlobject.id)
builder.writeln('SUBCLASS_OF_ID = {:#x}',
crc32(tlobject.result.encode('ascii')))
builder.writeln()
if arg.is_vector:
# Currently for the case of "messages.forwardMessages"
# Ensure we can infer the length from id:Vector<>
if not next(a for a in tlobject.real_args
if a.name == 'id').is_vector:
raise ValueError(
'Cannot infer list of random ids for ', tlobject
)
code = '[{} for _ in range(len(id))]'.format(code)
# Because we're using __slots__ and frozen instances, we cannot have flags = None directly.
# See https://stackoverflow.com/q/50180735 (Python 3.10 does offer a solution).
# Write the __init__ function if it has any argument.
if tlobject.real_args:
# Convert the args to string parameters
for a in tlobject.real_args:
builder.writeln('{}: {}', a.name, a.type_hint())
builder.writeln(
"self.random_id = random_id if random_id "
"is not None else {}", code
)
else:
raise ValueError('Cannot infer a value for ', arg)
# Convert the args to string parameters, flags having =None
args = ['{}: {}{}'.format(
a.name, a.type_hint(), '=None' if a.is_flag or a.can_be_inferred else '')
for a in tlobject.real_args
]
builder.end_block()
if any(a.name in dir(builtins) for a in tlobject.real_args):
builder.writeln('# noinspection PyShadowingBuiltins')
builder.writeln("def __init__({}):", ', '.join(['self'] + args))
# Set the arguments
for arg in tlobject.real_args:
builder.writeln("object.__setattr__(self, '{0}', {0})", arg.name)
builder.end_block()
def _write_resolve(tlobject, builder):
if tlobject.is_function and any(
(arg.type in AUTO_CASTS
or ((arg.name, arg.type) in NAMED_AUTO_CASTS
and tlobject.fullname not in NAMED_BLACKLIST))
for arg in tlobject.real_args
(arg.can_be_inferred
or arg.type in AUTO_CASTS
or ((arg.name, arg.type) in NAMED_AUTO_CASTS and tlobject.fullname not in NAMED_BLACKLIST))
for arg in tlobject.real_args
):
builder.writeln('async def resolve(self, client, utils):')
builder.writeln('r = {}') # hold replacements
for arg in tlobject.real_args:
if arg.can_be_inferred:
builder.writeln('if self.{} is None:', arg.name)
# Currently the only argument that can be
# inferred are those called 'random_id'
if arg.name == 'random_id':
# Endianness doesn't really matter, and 'big' is shorter
code = "int.from_bytes(os.urandom({}), 'big', signed=True)" \
.format(8 if arg.type == 'long' else 4)
if arg.is_vector:
# Currently for the case of "messages.forwardMessages"
# Ensure we can infer the length from id:Vector<>
if not next(a for a in tlobject.real_args if a.name == 'id').is_vector:
raise ValueError('Cannot infer list of random ids for ', tlobject)
code = '[{} for _ in range(len(self.id))]'.format(code)
builder.writeln("r['{}'] = {}", arg.name, code)
else:
raise ValueError('Cannot infer a value for ', arg)
builder.end_block()
continue
ac = AUTO_CASTS.get(arg.type)
if not ac:
ac = NAMED_AUTO_CASTS.get((arg.name, arg.type))
@@ -287,17 +298,17 @@ def _write_resolve(tlobject, builder):
builder.writeln('if self.{}:', arg.name)
if arg.is_vector:
builder.writeln('_tmp = []')
builder.writeln('for _x in self.{0}:', arg.name)
builder.writeln('_tmp.append({})', ac.format('_x'))
builder.writeln("r['{}'] = []", arg.name)
builder.writeln('for x in self.{0}:', arg.name)
builder.writeln("r['{}'].append({})", arg.name, ac.format('x'))
builder.end_block()
builder.writeln('self.{} = _tmp', arg.name)
else:
builder.writeln('self.{} = {}', arg.name,
ac.format('self.' + arg.name))
builder.writeln("r['{}'] = {}", arg.name, ac.format('self.' + arg.name))
if arg.is_flag:
builder.end_block()
builder.writeln('return dataclasses.replace(self, **r)')
builder.end_block()