Make logger fully configurable (#1087)

This commit is contained in:
Tulir Asokan
2019-01-11 16:52:30 +02:00
committed by Lonami
parent eda4178333
commit f271316d7d
15 changed files with 141 additions and 137 deletions

View File

@@ -1,7 +1,6 @@
import asyncio
import collections
import functools
import logging
from . import authenticator
from ..extensions.messagepacker import MessagePacker
@@ -24,8 +23,6 @@ from ..tl.types import (
)
from ..crypto import AuthKey
__log__ = logging.getLogger(__name__)
def _cancellable(func):
"""
@@ -59,12 +56,14 @@ class MTProtoSender:
A new authorization key will be generated on connection if no other
key exists yet.
"""
def __init__(self, auth_key, loop, *,
def __init__(self, auth_key, loop, *, loggers,
retries=5, delay=1, auto_reconnect=True, connect_timeout=None,
auth_key_callback=None,
update_callback=None, auto_reconnect_callback=None):
self._connection = None
self._loop = loop
self._loggers = loggers
self._log = loggers[__name__]
self._retries = retries
self._delay = delay
self._auto_reconnect = auto_reconnect
@@ -90,11 +89,12 @@ class MTProtoSender:
# Preserving the references of the AuthKey and state is important
self.auth_key = auth_key or AuthKey(None)
self._state = MTProtoState(self.auth_key)
self._state = MTProtoState(self.auth_key, loggers=self._loggers)
# Outgoing messages are put in a queue and sent in a batch.
# Note that here we're also storing their ``_RequestState``.
self._send_queue = MessagePacker(self._state, self._loop)
self._send_queue = MessagePacker(self._state, self._loop,
loggers=self._loggers)
# Sent states are remembered until a response is received.
self._pending_state = {}
@@ -132,7 +132,7 @@ class MTProtoSender:
Connects to the specified given connection using the given auth key.
"""
if self._user_connected:
__log__.info('User is already connected!')
self._log.info('User is already connected!')
return
self._connection = connection
@@ -210,13 +210,13 @@ class MTProtoSender:
authorization key if necessary, and starting the send and
receive loops.
"""
__log__.info('Connecting to %s...', self._connection)
self._log.info('Connecting to %s...', self._connection)
for retry in range(1, self._retries + 1):
try:
__log__.debug('Connection attempt {}...'.format(retry))
self._log.debug('Connection attempt {}...'.format(retry))
await self._connection.connect(timeout=self._connect_timeout)
except (ConnectionError, asyncio.TimeoutError) as e:
__log__.warning('Attempt {} at connecting failed: {}: {}'
self._log.warning('Attempt {} at connecting failed: {}: {}'
.format(retry, type(e).__name__, e))
await asyncio.sleep(self._delay)
else:
@@ -225,12 +225,12 @@ class MTProtoSender:
raise ConnectionError('Connection to Telegram failed {} times'
.format(self._retries))
__log__.debug('Connection success!')
self._log.debug('Connection success!')
if not self.auth_key:
plain = MTProtoPlainSender(self._connection)
for retry in range(1, self._retries + 1):
try:
__log__.debug('New auth_key attempt {}...'.format(retry))
self._log.debug('New auth_key attempt {}...'.format(retry))
self.auth_key.key, self._state.time_offset =\
await authenticator.do_authentication(plain)
@@ -243,7 +243,7 @@ class MTProtoSender:
break
except (SecurityError, AssertionError) as e:
__log__.warning('Attempt {} at new auth_key failed: {}'
self._log.warning('Attempt {} at new auth_key failed: {}'
.format(retry, e))
await asyncio.sleep(self._delay)
else:
@@ -252,10 +252,10 @@ class MTProtoSender:
self._disconnect(error=e)
raise e
__log__.debug('Starting send loop')
self._log.debug('Starting send loop')
self._send_loop_handle = self._loop.create_task(self._send_loop())
__log__.debug('Starting receive loop')
self._log.debug('Starting receive loop')
self._recv_loop_handle = self._loop.create_task(self._recv_loop())
# _disconnected only completes after manual disconnection
@@ -264,16 +264,16 @@ class MTProtoSender:
if self._disconnected.done():
self._disconnected = self._loop.create_future()
__log__.info('Connection to %s complete!', self._connection)
self._log.info('Connection to %s complete!', self._connection)
def _disconnect(self, error=None):
__log__.info('Disconnecting from %s...', self._connection)
self._log.info('Disconnecting from %s...', self._connection)
self._user_connected = False
try:
__log__.debug('Closing current connection...')
self._log.debug('Closing current connection...')
self._connection.disconnect()
finally:
__log__.debug('Cancelling {} pending message(s)...'
self._log.debug('Cancelling {} pending message(s)...'
.format(len(self._pending_state)))
for state in self._pending_state.values():
if error and not state.future.done():
@@ -286,14 +286,14 @@ class MTProtoSender:
self._last_ack = None
if self._send_loop_handle:
__log__.debug('Cancelling the send loop...')
self._log.debug('Cancelling the send loop...')
self._send_loop_handle.cancel()
if self._recv_loop_handle:
__log__.debug('Cancelling the receive loop...')
self._log.debug('Cancelling the receive loop...')
self._recv_loop_handle.cancel()
__log__.info('Disconnection from %s complete!', self._connection)
self._log.info('Disconnection from %s complete!', self._connection)
if self._disconnected and not self._disconnected.done():
if error:
self._disconnected.set_exception(error)
@@ -306,13 +306,13 @@ class MTProtoSender:
"""
self._reconnecting = True
__log__.debug('Closing current connection...')
self._log.debug('Closing current connection...')
self._connection.disconnect()
__log__.debug('Cancelling the send loop...')
self._log.debug('Cancelling the send loop...')
self._send_loop_handle.cancel()
__log__.debug('Cancelling the receive loop...')
self._log.debug('Cancelling the receive loop...')
self._recv_loop_handle.cancel()
self._reconnecting = False
@@ -325,12 +325,12 @@ class MTProtoSender:
try:
await self._connect()
except (ConnectionError, asyncio.TimeoutError) as e:
__log__.info('Failed reconnection retry %d/%d with %s',
self._log.info('Failed reconnection retry %d/%d with %s',
retry, retries, e.__class__.__name__)
await asyncio.sleep(self._delay)
except Exception:
__log__.exception('Unexpected exception reconnecting on '
self._log.exception('Unexpected exception reconnecting on '
'retry %d/%d', retry, retries)
await asyncio.sleep(self._delay)
@@ -343,7 +343,7 @@ class MTProtoSender:
break
else:
__log__.error('Failed to reconnect automatically.')
self._log.error('Failed to reconnect automatically.')
self._disconnect(error=ConnectionError())
def _start_reconnect(self):
@@ -368,7 +368,7 @@ class MTProtoSender:
self._last_acks.append(ack)
self._pending_ack.clear()
__log__.debug('Waiting for messages to send...')
self._log.debug('Waiting for messages to send...')
# TODO Wait for the connection send queue to be empty?
# This means that while it's not empty we can wait for
# more messages to be added to the send queue.
@@ -377,14 +377,14 @@ class MTProtoSender:
if not data:
continue
__log__.debug('Encrypting %d message(s) in %d bytes for sending',
self._log.debug('Encrypting %d message(s) in %d bytes for sending',
len(batch), len(data))
data = self._state.encrypt_message_data(data)
try:
await self._connection.send(data)
except ConnectionError:
__log__.info('Connection closed while sending data')
self._log.info('Connection closed while sending data')
self._start_reconnect()
return
@@ -397,7 +397,7 @@ class MTProtoSender:
if isinstance(s.request, TLRequest):
self._pending_state[s.msg_id] = s
__log__.debug('Encrypted messages put in a queue to be sent')
self._log.debug('Encrypted messages put in a queue to be sent')
@_cancellable
async def _recv_loop(self):
@@ -408,11 +408,11 @@ class MTProtoSender:
Besides `connect`, only this method ever receives data.
"""
while self._user_connected and not self._reconnecting:
__log__.debug('Receiving items from the network...')
self._log.debug('Receiving items from the network...')
try:
body = await self._connection.recv()
except ConnectionError:
__log__.info('Connection closed while receiving data')
self._log.info('Connection closed while receiving data')
self._start_reconnect()
return
@@ -420,20 +420,20 @@ class MTProtoSender:
message = self._state.decrypt_message_data(body)
except TypeNotFoundError as e:
# Received object which we don't know how to deserialize
__log__.info('Type %08x not found, remaining data %r',
self._log.info('Type %08x not found, remaining data %r',
e.invalid_constructor_id, e.remaining)
continue
except SecurityError as e:
# A step while decoding had the incorrect data. This message
# should not be considered safe and it should be ignored.
__log__.warning('Security error while unpacking a '
self._log.warning('Security error while unpacking a '
'received message: %s', e)
continue
except BufferError as e:
if isinstance(e, InvalidBufferError) and e.code == 404:
__log__.info('Broken authorization key; resetting')
self._log.info('Broken authorization key; resetting')
else:
__log__.warning('Invalid buffer %s', e)
self._log.warning('Invalid buffer %s', e)
self.auth_key.key = None
if self._auth_key_callback:
@@ -442,14 +442,14 @@ class MTProtoSender:
self._start_reconnect()
return
except Exception:
__log__.exception('Unhandled error while receiving data')
self._log.exception('Unhandled error while receiving data')
self._start_reconnect()
return
try:
await self._process_message(message)
except Exception:
__log__.exception('Unhandled error while processing msgs')
self._log.exception('Unhandled error while processing msgs')
# Response Handlers
@@ -498,7 +498,7 @@ class MTProtoSender:
"""
rpc_result = message.obj
state = self._pending_state.pop(rpc_result.req_msg_id, None)
__log__.debug('Handling RPC result for message %d',
self._log.debug('Handling RPC result for message %d',
rpc_result.req_msg_id)
if not state:
@@ -511,7 +511,7 @@ class MTProtoSender:
if not isinstance(reader.tgread_object(), upload.File):
raise ValueError('Not an upload.File')
except (TypeNotFoundError, ValueError):
__log__.info('Received response without parent request: {}'
self._log.info('Received response without parent request: {}'
.format(rpc_result.body))
return
@@ -535,7 +535,7 @@ class MTProtoSender:
msg_container#73f1f8dc messages:vector<%Message> = MessageContainer;
"""
__log__.debug('Handling container')
self._log.debug('Handling container')
for inner_message in message.obj.messages:
await self._process_message(inner_message)
@@ -545,13 +545,13 @@ class MTProtoSender:
gzip_packed#3072cfa1 packed_data:bytes = Object;
"""
__log__.debug('Handling gzipped data')
self._log.debug('Handling gzipped data')
with BinaryReader(message.obj.data) as reader:
message.obj = reader.tgread_object()
await self._process_message(message)
async def _handle_update(self, message):
__log__.debug('Handling update {}'
self._log.debug('Handling update {}'
.format(message.obj.__class__.__name__))
if self._update_callback:
await self._update_callback(message.obj)
@@ -564,7 +564,7 @@ class MTProtoSender:
pong#347773c5 msg_id:long ping_id:long = Pong;
"""
pong = message.obj
__log__.debug('Handling pong for message %d', pong.msg_id)
self._log.debug('Handling pong for message %d', pong.msg_id)
state = self._pending_state.pop(pong.msg_id, None)
if state:
state.future.set_result(pong)
@@ -578,12 +578,12 @@ class MTProtoSender:
error_code:int new_server_salt:long = BadMsgNotification;
"""
bad_salt = message.obj
__log__.debug('Handling bad salt for message %d', bad_salt.bad_msg_id)
self._log.debug('Handling bad salt for message %d', bad_salt.bad_msg_id)
self._state.salt = bad_salt.new_server_salt
states = self._pop_states(bad_salt.bad_msg_id)
self._send_queue.extend(states)
__log__.debug('%d message(s) will be resent', len(states))
self._log.debug('%d message(s) will be resent', len(states))
async def _handle_bad_notification(self, message):
"""
@@ -596,13 +596,13 @@ class MTProtoSender:
bad_msg = message.obj
states = self._pop_states(bad_msg.bad_msg_id)
__log__.debug('Handling bad msg %s', bad_msg)
self._log.debug('Handling bad msg %s', bad_msg)
if bad_msg.error_code in (16, 17):
# Sent msg_id too low or too high (respectively).
# Use the current msg_id to determine the right time offset.
to = self._state.update_time_offset(
correct_msg_id=message.msg_id)
__log__.info('System clock is wrong, set time offset to %ds', to)
self._log.info('System clock is wrong, set time offset to %ds', to)
elif bad_msg.error_code == 32:
# msg_seqno too low, so just pump it up by some "large" amount
# TODO A better fix would be to start with a new fresh session ID
@@ -618,7 +618,8 @@ class MTProtoSender:
# Messages are to be re-sent once we've corrected the issue
self._send_queue.extend(states)
__log__.debug('%d messages will be resent due to bad msg', len(states))
self._log.debug('%d messages will be resent due to bad msg',
len(states))
async def _handle_detailed_info(self, message):
"""
@@ -629,7 +630,7 @@ class MTProtoSender:
"""
# TODO https://goo.gl/VvpCC6
msg_id = message.obj.answer_msg_id
__log__.debug('Handling detailed info for message %d', msg_id)
self._log.debug('Handling detailed info for message %d', msg_id)
self._pending_ack.add(msg_id)
async def _handle_new_detailed_info(self, message):
@@ -641,7 +642,7 @@ class MTProtoSender:
"""
# TODO https://goo.gl/G7DPsR
msg_id = message.obj.answer_msg_id
__log__.debug('Handling new detailed info for message %d', msg_id)
self._log.debug('Handling new detailed info for message %d', msg_id)
self._pending_ack.add(msg_id)
async def _handle_new_session_created(self, message):
@@ -652,7 +653,7 @@ class MTProtoSender:
server_salt:long = NewSession;
"""
# TODO https://goo.gl/LMyN7A
__log__.debug('Handling new session created')
self._log.debug('Handling new session created')
self._state.salt = message.obj.server_salt
async def _handle_ack(self, message):
@@ -671,7 +672,7 @@ class MTProtoSender:
messages are acknowledged.
"""
ack = message.obj
__log__.debug('Handling acknowledge for %s', str(ack.msg_ids))
self._log.debug('Handling acknowledge for %s', str(ack.msg_ids))
for msg_id in ack.msg_ids:
state = self._pending_state.get(msg_id)
if state and isinstance(state.request, LogOutRequest):
@@ -688,7 +689,7 @@ class MTProtoSender:
"""
# TODO save these salts and automatically adjust to the
# correct one whenever the salt in use expires.
__log__.debug('Handling future salts for message %d', message.msg_id)
self._log.debug('Handling future salts for message %d', message.msg_id)
state = self._pending_state.pop(message.msg_id, None)
if state:
state.future.set_result(message.obj)