Revert "TUN-6007: Implement new edge discovery algorithm"

This reverts commit 4f468b8a5d.
This commit is contained in:
Devin Carr
2022-06-14 16:08:03 -07:00
parent 0458ad41dd
commit 1d79831651
14 changed files with 590 additions and 1381 deletions

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/google/uuid"
"github.com/lucas-clemente/quic-go"
"github.com/rs/zerolog"
"github.com/cloudflare/cloudflared/connection"
@@ -41,7 +42,6 @@ type Supervisor struct {
config *TunnelConfig
orchestrator *orchestration.Orchestrator
edgeIPs *edgediscovery.Edge
edgeTunnelServer EdgeTunnelServer
tunnelErrors chan tunnelError
tunnelsConnecting map[int]chan struct{}
// nextConnectedIndex and nextConnectedSignal are used to wait for all
@@ -76,34 +76,12 @@ func NewSupervisor(config *TunnelConfig, orchestrator *orchestration.Orchestrato
if len(config.EdgeAddrs) > 0 {
edgeIPs, err = edgediscovery.StaticEdge(config.Log, config.EdgeAddrs)
} else {
edgeIPs, err = edgediscovery.ResolveEdge(config.Log, config.Region, config.EdgeIPVersion)
edgeIPs, err = edgediscovery.ResolveEdge(config.Log, config.Region)
}
if err != nil {
return nil, err
}
reconnectCredentialManager := newReconnectCredentialManager(connection.MetricsNamespace, connection.TunnelSubsystem, config.HAConnections)
log := NewConnAwareLogger(config.Log, config.Observer)
var edgeAddrHandler EdgeAddrHandler
if config.EdgeIPVersion == allregions.IPv6Only || config.EdgeIPVersion == allregions.Auto {
edgeAddrHandler = &IPAddrFallback{}
} else { // IPv4Only
edgeAddrHandler = &DefaultAddrFallback{}
}
edgeTunnelServer := EdgeTunnelServer{
config: config,
cloudflaredUUID: cloudflaredUUID,
orchestrator: orchestrator,
credentialManager: reconnectCredentialManager,
edgeAddrs: edgeIPs,
edgeAddrHandler: edgeAddrHandler,
reconnectCh: reconnectCh,
gracefulShutdownC: gracefulShutdownC,
connAwareLogger: log,
}
useReconnectToken := false
if config.ClassicTunnel != nil {
useReconnectToken = config.ClassicTunnel.UseReconnectToken
@@ -114,12 +92,11 @@ func NewSupervisor(config *TunnelConfig, orchestrator *orchestration.Orchestrato
config: config,
orchestrator: orchestrator,
edgeIPs: edgeIPs,
edgeTunnelServer: edgeTunnelServer,
tunnelErrors: make(chan tunnelError),
tunnelsConnecting: map[int]chan struct{}{},
log: log,
log: NewConnAwareLogger(config.Log, config.Observer),
logTransport: config.LogTransport,
reconnectCredentialManager: reconnectCredentialManager,
reconnectCredentialManager: newReconnectCredentialManager(connection.MetricsNamespace, connection.TunnelSubsystem, config.HAConnections),
useReconnectToken: useReconnectToken,
reconnectCh: reconnectCh,
gracefulShutdownC: gracefulShutdownC,
@@ -166,18 +143,11 @@ func (s *Supervisor) Run(
tunnelsActive--
}
return nil
// startTunnel completed with a response
// startTunnel returned with error
// (note that this may also be caused by context cancellation)
case tunnelError := <-s.tunnelErrors:
tunnelsActive--
if tunnelError.err != nil && !shuttingDown {
switch tunnelError.err.(type) {
case ReconnectSignal:
// For tunnels that closed with reconnect signal, we reconnect immediately
go s.startTunnel(ctx, tunnelError.index, s.newConnectedTunnelSignal(tunnelError.index))
tunnelsActive++
continue
}
s.log.ConnAwareLogger().Err(tunnelError.err).Int(connection.LogFieldConnIndex, tunnelError.index).Msg("Connection terminated")
tunnelsWaiting = append(tunnelsWaiting, tunnelError.index)
s.waitForNextTunnel(tunnelError.index)
@@ -185,9 +155,10 @@ func (s *Supervisor) Run(
if backoffTimer == nil {
backoffTimer = backoff.BackoffTimer()
}
// Previously we'd mark the edge address as bad here, but now we'll just silently use another.
} else if tunnelsActive == 0 {
s.log.ConnAwareLogger().Msg("no more connections active and exiting")
// All connected tunnels exited gracefully, no more work to do
// all connected tunnels exited gracefully, no more work to do
return nil
}
// Backoff was set and its timer expired
@@ -221,8 +192,6 @@ func (s *Supervisor) Run(
}
// Returns nil if initialization succeeded, else the initialization error.
// Attempts here will be made to connect one tunnel, if successful, it will
// connect the available tunnels up to config.HAConnections.
func (s *Supervisor) initialize(
ctx context.Context,
connectedSignal *signal.Signal,
@@ -234,8 +203,6 @@ func (s *Supervisor) initialize(
}
go s.startFirstTunnel(ctx, connectedSignal)
// Wait for response from first tunnel before proceeding to attempt other HA edge tunnels
select {
case <-ctx.Done():
<-s.tunnelErrors
@@ -246,7 +213,6 @@ func (s *Supervisor) initialize(
return errEarlyShutdown
case <-connectedSignal.Wait():
}
// At least one successful connection, so start the rest
for i := 1; i < s.config.HAConnections; i++ {
ch := signal.New(make(chan struct{}))
@@ -263,42 +229,102 @@ func (s *Supervisor) startFirstTunnel(
connectedSignal *signal.Signal,
) {
var (
err error
addr *allregions.EdgeAddr
err error
)
const firstConnIndex = 0
defer func() {
s.tunnelErrors <- tunnelError{index: firstConnIndex, err: err}
}()
err = s.edgeTunnelServer.Serve(ctx, firstConnIndex, connectedSignal)
addr, err = s.edgeIPs.GetAddr(firstConnIndex)
if err != nil {
return
}
err = ServeTunnelLoop(
ctx,
s.reconnectCredentialManager,
s.config,
s.orchestrator,
addr,
s.log,
firstConnIndex,
connectedSignal,
s.cloudflaredUUID,
s.reconnectCh,
s.gracefulShutdownC,
)
// If the first tunnel disconnects, keep restarting it.
edgeErrors := 0
for s.unusedIPs() {
if ctx.Err() != nil {
return
}
if err == nil {
switch err.(type) {
case nil:
return
// try the next address if it was a quic.IdleTimeoutError, dialError(network problem) or
// dupConnRegisterTunnelError
case *quic.IdleTimeoutError, edgediscovery.DialError, connection.DupConnRegisterTunnelError:
edgeErrors++
default:
return
}
err = s.edgeTunnelServer.Serve(ctx, firstConnIndex, connectedSignal)
if edgeErrors >= 2 {
addr, err = s.edgeIPs.GetDifferentAddr(firstConnIndex)
if err != nil {
return
}
}
err = ServeTunnelLoop(
ctx,
s.reconnectCredentialManager,
s.config,
s.orchestrator,
addr,
s.log,
firstConnIndex,
connectedSignal,
s.cloudflaredUUID,
s.reconnectCh,
s.gracefulShutdownC,
)
}
}
// startTunnel starts a new tunnel connection. The resulting error will be sent on
// s.tunnelError as this is expected to run in a goroutine.
// s.tunnelErrors.
func (s *Supervisor) startTunnel(
ctx context.Context,
index int,
connectedSignal *signal.Signal,
) {
var (
err error
addr *allregions.EdgeAddr
err error
)
defer func() {
s.tunnelErrors <- tunnelError{index: index, err: err}
}()
err = s.edgeTunnelServer.Serve(ctx, uint8(index), connectedSignal)
addr, err = s.edgeIPs.GetDifferentAddr(index)
if err != nil {
return
}
err = ServeTunnelLoop(
ctx,
s.reconnectCredentialManager,
s.config,
s.orchestrator,
addr,
s.log,
uint8(index),
connectedSignal,
s.cloudflaredUUID,
s.reconnectCh,
s.gracefulShutdownC,
)
}
func (s *Supervisor) newConnectedTunnelSignal(index int) *signal.Signal {

View File

@@ -122,84 +122,28 @@ func StartTunnelDaemon(
return s.Run(ctx, connectedSignal)
}
// EdgeAddrHandler provides a mechanism switch between behaviors in ServeTunnel
// for handling the errors when attempting to make edge connections.
type EdgeAddrHandler interface {
// ShouldGetNewAddress will check the edge connection error and determine if
// the edge address should be replaced with a new one. Also, will return if the
// error should be recognized as a connectivity error, or otherwise, a general
// application error.
ShouldGetNewAddress(err error) (needsNewAddress bool, isConnectivityError bool)
}
// DefaultAddrFallback will always return false for isConnectivityError since this
// handler is a way to provide the legacy behavior in the new edge discovery algorithm.
type DefaultAddrFallback struct {
edgeErrors int
}
func (f DefaultAddrFallback) ShouldGetNewAddress(err error) (needsNewAddress bool, isConnectivityError bool) {
switch err.(type) {
case nil: // maintain current IP address
// Try the next address if it was a quic.IdleTimeoutError or
// dupConnRegisterTunnelError
case *quic.IdleTimeoutError,
connection.DupConnRegisterTunnelError,
edgediscovery.DialError,
*connection.EdgeQuicDialError:
// Wait for two failures before falling back to a new address
f.edgeErrors++
if f.edgeErrors >= 2 {
f.edgeErrors = 0
return true, false
}
default: // maintain current IP address
}
return false, false
}
// IPAddrFallback will have more conditions to fall back to a new address for certain
// edge connection errors. This means that this handler will return true for isConnectivityError
// for more cases like duplicate connection register and edge quic dial errors.
type IPAddrFallback struct{}
func (f IPAddrFallback) ShouldGetNewAddress(err error) (needsNewAddress bool, isConnectivityError bool) {
switch err.(type) {
case nil: // maintain current IP address
// Try the next address if it was a quic.IdleTimeoutError
// DupConnRegisterTunnelError needs to also receive a new ip address
case connection.DupConnRegisterTunnelError,
*quic.IdleTimeoutError:
return true, false
// Network problems should be retried with new address immediately and report
// as connectivity error
case edgediscovery.DialError, *connection.EdgeQuicDialError:
return true, true
default: // maintain current IP address
}
return false, false
}
type EdgeTunnelServer struct {
config *TunnelConfig
cloudflaredUUID uuid.UUID
orchestrator *orchestration.Orchestrator
credentialManager *reconnectCredentialManager
edgeAddrHandler EdgeAddrHandler
edgeAddrs *edgediscovery.Edge
reconnectCh chan ReconnectSignal
gracefulShutdownC <-chan struct{}
connAwareLogger *ConnAwareLogger
}
func (e EdgeTunnelServer) Serve(ctx context.Context, connIndex uint8, connectedSignal *signal.Signal) error {
func ServeTunnelLoop(
ctx context.Context,
credentialManager *reconnectCredentialManager,
config *TunnelConfig,
orchestrator *orchestration.Orchestrator,
addr *allregions.EdgeAddr,
connAwareLogger *ConnAwareLogger,
connIndex uint8,
connectedSignal *signal.Signal,
cloudflaredUUID uuid.UUID,
reconnectCh chan ReconnectSignal,
gracefulShutdownC <-chan struct{},
) error {
haConnections.Inc()
defer haConnections.Dec()
logger := config.Log.With().Uint8(connection.LogFieldConnIndex, connIndex).Logger()
connLog := connAwareLogger.ReplaceLogger(&logger)
protocolFallback := &protocolFallback{
retry.BackoffHandler{MaxRetries: e.config.Retries},
e.config.ProtocolSelector.Current(),
retry.BackoffHandler{MaxRetries: config.Retries},
config.ProtocolSelector.Current(),
false,
}
connectedFuse := h2mux.NewBooleanFuse()
@@ -210,81 +154,54 @@ func (e EdgeTunnelServer) Serve(ctx context.Context, connIndex uint8, connectedS
}()
// Ensure the above goroutine will terminate if we return without connecting
defer connectedFuse.Fuse(false)
// Fetch IP address to associated connection index
addr, err := e.edgeAddrs.GetAddr(int(connIndex))
switch err {
case nil: // no error
case edgediscovery.ErrNoAddressesLeft:
return err
default:
return err
}
logger := e.config.Log.With().
IPAddr(connection.LogFieldIPAddress, addr.UDP.IP).
Uint8(connection.LogFieldConnIndex, connIndex).
Logger()
connLog := e.connAwareLogger.ReplaceLogger(&logger)
// Each connection to keep its own copy of protocol, because individual connections might fallback
// to another protocol when a particular metal doesn't support new protocol
// Each connection can also have it's own IP version because individual connections might fallback
// to another IP version.
err, recoverable := ServeTunnel(
ctx,
connLog,
e.credentialManager,
e.config,
e.orchestrator,
addr,
connIndex,
connectedFuse,
protocolFallback,
e.cloudflaredUUID,
e.reconnectCh,
protocolFallback.protocol,
e.gracefulShutdownC,
)
// If the connection is recoverable, we want to maintain the same IP
// but backoff a reconnect with some duration.
if recoverable {
duration, ok := protocolFallback.GetMaxBackoffDuration(ctx)
if !ok {
return err
}
e.config.Observer.SendReconnect(connIndex)
connLog.Logger().Info().Msgf("Retrying connection in up to %s seconds", duration)
}
// Check if the connection error was from an IP issue with the host or
// establishing a connection to the edge and if so, rotate the IP address.
yes, hasConnectivityError := e.edgeAddrHandler.ShouldGetNewAddress(err)
if yes {
e.edgeAddrs.GetDifferentAddr(int(connIndex), hasConnectivityError)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-e.gracefulShutdownC:
return nil
case <-protocolFallback.BackoffTimer():
if !recoverable {
return err
}
if !selectNextProtocol(
connLog.Logger(),
for {
err, recoverable := ServeTunnel(
ctx,
connLog,
credentialManager,
config,
orchestrator,
addr,
connIndex,
connectedFuse,
protocolFallback,
e.config.ProtocolSelector,
err,
) {
return err
cloudflaredUUID,
reconnectCh,
protocolFallback.protocol,
gracefulShutdownC,
)
if recoverable {
duration, ok := protocolFallback.GetMaxBackoffDuration(ctx)
if !ok {
return err
}
config.Observer.SendReconnect(connIndex)
connLog.Logger().Info().Msgf("Retrying connection in up to %s seconds", duration)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-gracefulShutdownC:
return nil
case <-protocolFallback.BackoffTimer():
if !recoverable {
return err
}
if !selectNextProtocol(
connLog.Logger(),
protocolFallback,
config.ProtocolSelector,
err,
) {
return err
}
}
}
return err
}
// protocolFallback is a wrapper around backoffHandler that will try fallback option when backoff reaches
@@ -316,10 +233,6 @@ func selectNextProtocol(
) bool {
var idleTimeoutError *quic.IdleTimeoutError
isNetworkActivityTimeout := errors.As(cause, &idleTimeoutError)
edgeQuicDialError, ok := cause.(*connection.EdgeQuicDialError)
if !isNetworkActivityTimeout && ok {
isNetworkActivityTimeout = errors.As(edgeQuicDialError.Cause, &idleTimeoutError)
}
_, hasFallback := selector.Fallback()
if protocolBackoff.ReachedMaxRetries() || (hasFallback && isNetworkActivityTimeout) {
@@ -328,7 +241,7 @@ func selectNextProtocol(
"Cloudflare Network with `quic` protocol, then most likely your machine/network is getting its egress " +
"UDP to port 7844 (or others) blocked or dropped. Make sure to allow egress connectivity as per " +
"https://developers.cloudflare.com/cloudflare-one/connections/connect-apps/configuration/ports-and-ips/\n" +
"If you are using private routing to this Tunnel, then UDP (and Private DNS Resolution) will not work " +
"If you are using private routing to this Tunnel, then UDP (and Private DNS Resolution) will not work" +
"unless your cloudflared can connect with Cloudflare Network with `quic`.")
}
@@ -413,12 +326,8 @@ func ServeTunnel(
connLog.ConnAwareLogger().Msg(activeIncidentsMsg(incidents))
}
return err.Cause, !err.Permanent
case *connection.EdgeQuicDialError:
// Don't retry connection for a dial error
return err, false
case ReconnectSignal:
connLog.Logger().Info().
IPAddr(connection.LogFieldIPAddress, addr.UDP.IP).
Uint8(connection.LogFieldConnIndex, connIndex).
Msgf("Restarting connection due to reconnect signal in %s", err.Delay)
err.DelayBeforeReconnect()
@@ -617,7 +526,6 @@ func ServeHTTP2(
err := listenReconnect(serveCtx, reconnectCh, gracefulShutdownC)
if err != nil {
// forcefully break the connection (this is only used for testing)
connLog.Logger().Debug().Msg("Forcefully breaking http2 connection")
_ = tlsServerConn.Close()
}
return err
@@ -676,7 +584,6 @@ func ServeQUIC(
err := listenReconnect(serveCtx, reconnectCh, gracefulShutdownC)
if err != nil {
// forcefully break the connection (this is only used for testing)
connLogger.Logger().Debug().Msg("Forcefully breaking quic connection")
quicConn.Close()
}
return err