TUN-6191: Update quic-go to v0.27.1 and with custom patch to allow keep alive period to be configurable

The idle period is set to 5sec.

We now also ping every second since last activity.
This makes the quic.Connection less prone to being closed with
no network activity, since we send multiple pings per idle
period, and thus a single packet loss cannot cause the problem.
This commit is contained in:
Nuno Diegues
2022-06-06 14:15:35 +01:00
parent 4ccef23dbc
commit 475939a77f
49 changed files with 671 additions and 486 deletions

View File

@@ -36,14 +36,14 @@ type unknownPacketHandler interface {
type packetHandlerManager interface {
AddWithConnID(protocol.ConnectionID, protocol.ConnectionID, func() packetHandler) bool
Destroy() error
sessionRunner
connRunner
SetServer(unknownPacketHandler)
CloseServer()
}
type quicSession interface {
EarlySession
earlySessionReady() <-chan struct{}
type quicConn interface {
EarlyConnection
earlyConnReady() <-chan struct{}
handlePacket(*receivedPacket)
GetVersion() protocol.VersionNumber
getPerspective() protocol.Perspective
@@ -56,26 +56,26 @@ type quicSession interface {
type baseServer struct {
mutex sync.Mutex
acceptEarlySessions bool
acceptEarlyConns bool
tlsConf *tls.Config
config *Config
conn connection
conn rawConn
// If the server is started with ListenAddr, we create a packet conn.
// If it is started with Listen, we take a packet conn as a parameter.
createdPacketConn bool
tokenGenerator *handshake.TokenGenerator
sessionHandler packetHandlerManager
connHandler packetHandlerManager
receivedPackets chan *receivedPacket
// set as a member, so they can be set in the tests
newSession func(
newConn func(
sendConn,
sessionRunner,
connRunner,
protocol.ConnectionID, /* original dest connection ID */
*protocol.ConnectionID, /* retry src connection ID */
protocol.ConnectionID, /* client dest connection ID */
@@ -90,15 +90,15 @@ type baseServer struct {
uint64,
utils.Logger,
protocol.VersionNumber,
) quicSession
) quicConn
serverError error
errorChan chan struct{}
closed bool
running chan struct{} // closed as soon as run() returns
sessionQueue chan quicSession
sessionQueueLen int32 // to be used as an atomic
connQueue chan quicConn
connQueueLen int32 // to be used as an atomic
logger utils.Logger
}
@@ -112,7 +112,7 @@ type earlyServer struct{ *baseServer }
var _ EarlyListener = &earlyServer{}
func (s *earlyServer) Accept(ctx context.Context) (EarlySession, error) {
func (s *earlyServer) Accept(ctx context.Context) (EarlyConnection, error) {
return s.baseServer.accept(ctx)
}
@@ -123,7 +123,7 @@ func ListenAddr(addr string, tlsConf *tls.Config, config *Config) (Listener, err
return listenAddr(addr, tlsConf, config, false)
}
// ListenAddrEarly works like ListenAddr, but it returns sessions before the handshake completes.
// ListenAddrEarly works like ListenAddr, but it returns connections before the handshake completes.
func ListenAddrEarly(addr string, tlsConf *tls.Config, config *Config) (EarlyListener, error) {
s, err := listenAddr(addr, tlsConf, config, true)
if err != nil {
@@ -164,7 +164,7 @@ func Listen(conn net.PacketConn, tlsConf *tls.Config, config *Config) (Listener,
return listen(conn, tlsConf, config, false)
}
// ListenEarly works like Listen, but it returns sessions before the handshake completes.
// ListenEarly works like Listen, but it returns connections before the handshake completes.
func ListenEarly(conn net.PacketConn, tlsConf *tls.Config, config *Config) (EarlyListener, error) {
s, err := listen(conn, tlsConf, config, true)
if err != nil {
@@ -187,7 +187,7 @@ func listen(conn net.PacketConn, tlsConf *tls.Config, config *Config, acceptEarl
}
}
sessionHandler, err := getMultiplexer().AddConn(conn, config.ConnectionIDLength, config.StatelessResetKey, config.Tracer)
connHandler, err := getMultiplexer().AddConn(conn, config.ConnectionIDLength, config.StatelessResetKey, config.Tracer)
if err != nil {
return nil, err
}
@@ -200,21 +200,21 @@ func listen(conn net.PacketConn, tlsConf *tls.Config, config *Config, acceptEarl
return nil, err
}
s := &baseServer{
conn: c,
tlsConf: tlsConf,
config: config,
tokenGenerator: tokenGenerator,
sessionHandler: sessionHandler,
sessionQueue: make(chan quicSession),
errorChan: make(chan struct{}),
running: make(chan struct{}),
receivedPackets: make(chan *receivedPacket, protocol.MaxServerUnprocessedPackets),
newSession: newSession,
logger: utils.DefaultLogger.WithPrefix("server"),
acceptEarlySessions: acceptEarly,
conn: c,
tlsConf: tlsConf,
config: config,
tokenGenerator: tokenGenerator,
connHandler: connHandler,
connQueue: make(chan quicConn),
errorChan: make(chan struct{}),
running: make(chan struct{}),
receivedPackets: make(chan *receivedPacket, protocol.MaxServerUnprocessedPackets),
newConn: newConnection,
logger: utils.DefaultLogger.WithPrefix("server"),
acceptEarlyConns: acceptEarly,
}
go s.run()
sessionHandler.SetServer(s)
connHandler.SetServer(s)
s.logger.Debugf("Listening for %s connections on %s", conn.LocalAddr().Network(), conn.LocalAddr().String())
return s, nil
}
@@ -258,19 +258,19 @@ var defaultAcceptToken = func(clientAddr net.Addr, token *Token) bool {
return sourceAddr == token.RemoteAddr
}
// Accept returns sessions that already completed the handshake.
// It is only valid if acceptEarlySessions is false.
func (s *baseServer) Accept(ctx context.Context) (Session, error) {
// Accept returns connections that already completed the handshake.
// It is only valid if acceptEarlyConns is false.
func (s *baseServer) Accept(ctx context.Context) (Connection, error) {
return s.accept(ctx)
}
func (s *baseServer) accept(ctx context.Context) (quicSession, error) {
func (s *baseServer) accept(ctx context.Context) (quicConn, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case sess := <-s.sessionQueue:
atomic.AddInt32(&s.sessionQueueLen, -1)
return sess, nil
case conn := <-s.connQueue:
atomic.AddInt32(&s.connQueueLen, -1)
return conn, nil
case <-s.errorChan:
return nil, s.serverError
}
@@ -294,9 +294,9 @@ func (s *baseServer) Close() error {
s.mutex.Unlock()
<-s.running
s.sessionHandler.CloseServer()
s.connHandler.CloseServer()
if createdPacketConn {
return s.sessionHandler.Destroy()
return s.connHandler.Destroy()
}
return nil
}
@@ -336,7 +336,7 @@ func (s *baseServer) handlePacketImpl(p *receivedPacket) bool /* is the buffer s
}
return false
}
// If we're creating a new session, the packet will be passed to the session.
// If we're creating a new connection, the packet will be passed to the connection.
// The header will then be parsed again.
hdr, _, _, err := wire.ParsePacket(p.data, s.config.ConnectionIDLength)
if err != nil && err != wire.ErrUnsupportedVersion {
@@ -436,7 +436,7 @@ func (s *baseServer) handleInitialImpl(p *receivedPacket, hdr *wire.Header) erro
return nil
}
if queueLen := atomic.LoadInt32(&s.sessionQueueLen); queueLen >= protocol.MaxAcceptQueueSize {
if queueLen := atomic.LoadInt32(&s.connQueueLen); queueLen >= protocol.MaxAcceptQueueSize {
s.logger.Debugf("Rejecting new connection. Server currently busy. Accept queue length: %d (max %d)", queueLen, protocol.MaxAcceptQueueSize)
go func() {
defer p.buffer.Release()
@@ -452,9 +452,9 @@ func (s *baseServer) handleInitialImpl(p *receivedPacket, hdr *wire.Header) erro
return err
}
s.logger.Debugf("Changing connection ID to %s.", connID)
var sess quicSession
tracingID := nextSessionTracingID()
if added := s.sessionHandler.AddWithConnID(hdr.DestConnectionID, connID, func() packetHandler {
var conn quicConn
tracingID := nextConnTracingID()
if added := s.connHandler.AddWithConnID(hdr.DestConnectionID, connID, func() packetHandler {
var tracer logging.ConnectionTracer
if s.config.Tracer != nil {
// Use the same connection ID that is passed to the client's GetLogWriter callback.
@@ -463,74 +463,74 @@ func (s *baseServer) handleInitialImpl(p *receivedPacket, hdr *wire.Header) erro
connID = origDestConnID
}
tracer = s.config.Tracer.TracerForConnection(
context.WithValue(context.Background(), SessionTracingKey, tracingID),
context.WithValue(context.Background(), ConnectionTracingKey, tracingID),
protocol.PerspectiveServer,
connID,
)
}
sess = s.newSession(
conn = s.newConn(
newSendConn(s.conn, p.remoteAddr, p.info),
s.sessionHandler,
s.connHandler,
origDestConnID,
retrySrcConnID,
hdr.DestConnectionID,
hdr.SrcConnectionID,
connID,
s.sessionHandler.GetStatelessResetToken(connID),
s.connHandler.GetStatelessResetToken(connID),
s.config,
s.tlsConf,
s.tokenGenerator,
s.acceptEarlySessions,
s.acceptEarlyConns,
tracer,
tracingID,
s.logger,
hdr.Version,
)
sess.handlePacket(p)
return sess
conn.handlePacket(p)
return conn
}); !added {
return nil
}
go sess.run()
go s.handleNewSession(sess)
if sess == nil {
go conn.run()
go s.handleNewConn(conn)
if conn == nil {
p.buffer.Release()
return nil
}
return nil
}
func (s *baseServer) handleNewSession(sess quicSession) {
sessCtx := sess.Context()
if s.acceptEarlySessions {
// wait until the early session is ready (or the handshake fails)
func (s *baseServer) handleNewConn(conn quicConn) {
connCtx := conn.Context()
if s.acceptEarlyConns {
// wait until the early connection is ready (or the handshake fails)
select {
case <-sess.earlySessionReady():
case <-sessCtx.Done():
case <-conn.earlyConnReady():
case <-connCtx.Done():
return
}
} else {
// wait until the handshake is complete (or fails)
select {
case <-sess.HandshakeComplete().Done():
case <-sessCtx.Done():
case <-conn.HandshakeComplete().Done():
case <-connCtx.Done():
return
}
}
atomic.AddInt32(&s.sessionQueueLen, 1)
atomic.AddInt32(&s.connQueueLen, 1)
select {
case s.sessionQueue <- sess:
// blocks until the session is accepted
case <-sessCtx.Done():
atomic.AddInt32(&s.sessionQueueLen, -1)
// don't pass sessions that were already closed to Accept()
case s.connQueue <- conn:
// blocks until the connection is accepted
case <-connCtx.Done():
atomic.AddInt32(&s.connQueueLen, -1)
// don't pass connections that were already closed to Accept()
}
}
func (s *baseServer) sendRetry(remoteAddr net.Addr, hdr *wire.Header, info *packetInfo) error {
// Log the Initial packet now.
// If no Retry is sent, the packet will be logged by the session.
// If no Retry is sent, the packet will be logged by the connection.
(&wire.ExtendedHeader{Header: *hdr}).Log(s.logger)
srcConnID, err := protocol.GenerateConnectionID(s.config.ConnectionIDLength)
if err != nil {