mirror of
https://github.com/cloudflare/cloudflared.git
synced 2025-07-27 22:59:58 +00:00
TUN-6007: Implement new edge discovery algorithm
(cherry picked from commit 4f468b8a5d
)
This commit is contained in:
@@ -7,7 +7,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/lucas-clemente/quic-go"
|
||||
"github.com/rs/zerolog"
|
||||
|
||||
"github.com/cloudflare/cloudflared/connection"
|
||||
@@ -42,6 +41,7 @@ type Supervisor struct {
|
||||
config *TunnelConfig
|
||||
orchestrator *orchestration.Orchestrator
|
||||
edgeIPs *edgediscovery.Edge
|
||||
edgeTunnelServer EdgeTunnelServer
|
||||
tunnelErrors chan tunnelError
|
||||
tunnelsConnecting map[int]chan struct{}
|
||||
// nextConnectedIndex and nextConnectedSignal are used to wait for all
|
||||
@@ -76,12 +76,34 @@ func NewSupervisor(config *TunnelConfig, orchestrator *orchestration.Orchestrato
|
||||
if len(config.EdgeAddrs) > 0 {
|
||||
edgeIPs, err = edgediscovery.StaticEdge(config.Log, config.EdgeAddrs)
|
||||
} else {
|
||||
edgeIPs, err = edgediscovery.ResolveEdge(config.Log, config.Region)
|
||||
edgeIPs, err = edgediscovery.ResolveEdge(config.Log, config.Region, config.EdgeIPVersion)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
reconnectCredentialManager := newReconnectCredentialManager(connection.MetricsNamespace, connection.TunnelSubsystem, config.HAConnections)
|
||||
log := NewConnAwareLogger(config.Log, config.Observer)
|
||||
|
||||
var edgeAddrHandler EdgeAddrHandler
|
||||
if config.EdgeIPVersion == allregions.IPv6Only || config.EdgeIPVersion == allregions.Auto {
|
||||
edgeAddrHandler = &IPAddrFallback{}
|
||||
} else { // IPv4Only
|
||||
edgeAddrHandler = &DefaultAddrFallback{}
|
||||
}
|
||||
|
||||
edgeTunnelServer := EdgeTunnelServer{
|
||||
config: config,
|
||||
cloudflaredUUID: cloudflaredUUID,
|
||||
orchestrator: orchestrator,
|
||||
credentialManager: reconnectCredentialManager,
|
||||
edgeAddrs: edgeIPs,
|
||||
edgeAddrHandler: edgeAddrHandler,
|
||||
reconnectCh: reconnectCh,
|
||||
gracefulShutdownC: gracefulShutdownC,
|
||||
connAwareLogger: log,
|
||||
}
|
||||
|
||||
useReconnectToken := false
|
||||
if config.ClassicTunnel != nil {
|
||||
useReconnectToken = config.ClassicTunnel.UseReconnectToken
|
||||
@@ -92,11 +114,12 @@ func NewSupervisor(config *TunnelConfig, orchestrator *orchestration.Orchestrato
|
||||
config: config,
|
||||
orchestrator: orchestrator,
|
||||
edgeIPs: edgeIPs,
|
||||
edgeTunnelServer: edgeTunnelServer,
|
||||
tunnelErrors: make(chan tunnelError),
|
||||
tunnelsConnecting: map[int]chan struct{}{},
|
||||
log: NewConnAwareLogger(config.Log, config.Observer),
|
||||
log: log,
|
||||
logTransport: config.LogTransport,
|
||||
reconnectCredentialManager: newReconnectCredentialManager(connection.MetricsNamespace, connection.TunnelSubsystem, config.HAConnections),
|
||||
reconnectCredentialManager: reconnectCredentialManager,
|
||||
useReconnectToken: useReconnectToken,
|
||||
reconnectCh: reconnectCh,
|
||||
gracefulShutdownC: gracefulShutdownC,
|
||||
@@ -143,11 +166,18 @@ func (s *Supervisor) Run(
|
||||
tunnelsActive--
|
||||
}
|
||||
return nil
|
||||
// startTunnel returned with error
|
||||
// startTunnel completed with a response
|
||||
// (note that this may also be caused by context cancellation)
|
||||
case tunnelError := <-s.tunnelErrors:
|
||||
tunnelsActive--
|
||||
if tunnelError.err != nil && !shuttingDown {
|
||||
switch tunnelError.err.(type) {
|
||||
case ReconnectSignal:
|
||||
// For tunnels that closed with reconnect signal, we reconnect immediately
|
||||
go s.startTunnel(ctx, tunnelError.index, s.newConnectedTunnelSignal(tunnelError.index))
|
||||
tunnelsActive++
|
||||
continue
|
||||
}
|
||||
s.log.ConnAwareLogger().Err(tunnelError.err).Int(connection.LogFieldConnIndex, tunnelError.index).Msg("Connection terminated")
|
||||
tunnelsWaiting = append(tunnelsWaiting, tunnelError.index)
|
||||
s.waitForNextTunnel(tunnelError.index)
|
||||
@@ -155,10 +185,9 @@ func (s *Supervisor) Run(
|
||||
if backoffTimer == nil {
|
||||
backoffTimer = backoff.BackoffTimer()
|
||||
}
|
||||
|
||||
// Previously we'd mark the edge address as bad here, but now we'll just silently use another.
|
||||
} else if tunnelsActive == 0 {
|
||||
// all connected tunnels exited gracefully, no more work to do
|
||||
s.log.ConnAwareLogger().Msg("no more connections active and exiting")
|
||||
// All connected tunnels exited gracefully, no more work to do
|
||||
return nil
|
||||
}
|
||||
// Backoff was set and its timer expired
|
||||
@@ -192,6 +221,8 @@ func (s *Supervisor) Run(
|
||||
}
|
||||
|
||||
// Returns nil if initialization succeeded, else the initialization error.
|
||||
// Attempts here will be made to connect one tunnel, if successful, it will
|
||||
// connect the available tunnels up to config.HAConnections.
|
||||
func (s *Supervisor) initialize(
|
||||
ctx context.Context,
|
||||
connectedSignal *signal.Signal,
|
||||
@@ -203,6 +234,8 @@ func (s *Supervisor) initialize(
|
||||
}
|
||||
|
||||
go s.startFirstTunnel(ctx, connectedSignal)
|
||||
|
||||
// Wait for response from first tunnel before proceeding to attempt other HA edge tunnels
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
<-s.tunnelErrors
|
||||
@@ -213,6 +246,7 @@ func (s *Supervisor) initialize(
|
||||
return errEarlyShutdown
|
||||
case <-connectedSignal.Wait():
|
||||
}
|
||||
|
||||
// At least one successful connection, so start the rest
|
||||
for i := 1; i < s.config.HAConnections; i++ {
|
||||
ch := signal.New(make(chan struct{}))
|
||||
@@ -229,102 +263,42 @@ func (s *Supervisor) startFirstTunnel(
|
||||
connectedSignal *signal.Signal,
|
||||
) {
|
||||
var (
|
||||
addr *allregions.EdgeAddr
|
||||
err error
|
||||
err error
|
||||
)
|
||||
const firstConnIndex = 0
|
||||
defer func() {
|
||||
s.tunnelErrors <- tunnelError{index: firstConnIndex, err: err}
|
||||
}()
|
||||
|
||||
addr, err = s.edgeIPs.GetAddr(firstConnIndex)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = s.edgeTunnelServer.Serve(ctx, firstConnIndex, connectedSignal)
|
||||
|
||||
err = ServeTunnelLoop(
|
||||
ctx,
|
||||
s.reconnectCredentialManager,
|
||||
s.config,
|
||||
s.orchestrator,
|
||||
addr,
|
||||
s.log,
|
||||
firstConnIndex,
|
||||
connectedSignal,
|
||||
s.cloudflaredUUID,
|
||||
s.reconnectCh,
|
||||
s.gracefulShutdownC,
|
||||
)
|
||||
// If the first tunnel disconnects, keep restarting it.
|
||||
edgeErrors := 0
|
||||
for s.unusedIPs() {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
switch err.(type) {
|
||||
case nil:
|
||||
return
|
||||
// try the next address if it was a quic.IdleTimeoutError, dialError(network problem) or
|
||||
// dupConnRegisterTunnelError
|
||||
case *quic.IdleTimeoutError, edgediscovery.DialError, connection.DupConnRegisterTunnelError:
|
||||
edgeErrors++
|
||||
default:
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
if edgeErrors >= 2 {
|
||||
addr, err = s.edgeIPs.GetDifferentAddr(firstConnIndex)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
err = ServeTunnelLoop(
|
||||
ctx,
|
||||
s.reconnectCredentialManager,
|
||||
s.config,
|
||||
s.orchestrator,
|
||||
addr,
|
||||
s.log,
|
||||
firstConnIndex,
|
||||
connectedSignal,
|
||||
s.cloudflaredUUID,
|
||||
s.reconnectCh,
|
||||
s.gracefulShutdownC,
|
||||
)
|
||||
err = s.edgeTunnelServer.Serve(ctx, firstConnIndex, connectedSignal)
|
||||
}
|
||||
}
|
||||
|
||||
// startTunnel starts a new tunnel connection. The resulting error will be sent on
|
||||
// s.tunnelErrors.
|
||||
// s.tunnelError as this is expected to run in a goroutine.
|
||||
func (s *Supervisor) startTunnel(
|
||||
ctx context.Context,
|
||||
index int,
|
||||
connectedSignal *signal.Signal,
|
||||
) {
|
||||
var (
|
||||
addr *allregions.EdgeAddr
|
||||
err error
|
||||
err error
|
||||
)
|
||||
defer func() {
|
||||
s.tunnelErrors <- tunnelError{index: index, err: err}
|
||||
}()
|
||||
|
||||
addr, err = s.edgeIPs.GetDifferentAddr(index)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = ServeTunnelLoop(
|
||||
ctx,
|
||||
s.reconnectCredentialManager,
|
||||
s.config,
|
||||
s.orchestrator,
|
||||
addr,
|
||||
s.log,
|
||||
uint8(index),
|
||||
connectedSignal,
|
||||
s.cloudflaredUUID,
|
||||
s.reconnectCh,
|
||||
s.gracefulShutdownC,
|
||||
)
|
||||
err = s.edgeTunnelServer.Serve(ctx, uint8(index), connectedSignal)
|
||||
}
|
||||
|
||||
func (s *Supervisor) newConnectedTunnelSignal(index int) *signal.Signal {
|
||||
|
@@ -122,28 +122,84 @@ func StartTunnelDaemon(
|
||||
return s.Run(ctx, connectedSignal)
|
||||
}
|
||||
|
||||
func ServeTunnelLoop(
|
||||
ctx context.Context,
|
||||
credentialManager *reconnectCredentialManager,
|
||||
config *TunnelConfig,
|
||||
orchestrator *orchestration.Orchestrator,
|
||||
addr *allregions.EdgeAddr,
|
||||
connAwareLogger *ConnAwareLogger,
|
||||
connIndex uint8,
|
||||
connectedSignal *signal.Signal,
|
||||
cloudflaredUUID uuid.UUID,
|
||||
reconnectCh chan ReconnectSignal,
|
||||
gracefulShutdownC <-chan struct{},
|
||||
) error {
|
||||
// EdgeAddrHandler provides a mechanism switch between behaviors in ServeTunnel
|
||||
// for handling the errors when attempting to make edge connections.
|
||||
type EdgeAddrHandler interface {
|
||||
// ShouldGetNewAddress will check the edge connection error and determine if
|
||||
// the edge address should be replaced with a new one. Also, will return if the
|
||||
// error should be recognized as a connectivity error, or otherwise, a general
|
||||
// application error.
|
||||
ShouldGetNewAddress(err error) (needsNewAddress bool, isConnectivityError bool)
|
||||
}
|
||||
|
||||
// DefaultAddrFallback will always return false for isConnectivityError since this
|
||||
// handler is a way to provide the legacy behavior in the new edge discovery algorithm.
|
||||
type DefaultAddrFallback struct {
|
||||
edgeErrors int
|
||||
}
|
||||
|
||||
func (f DefaultAddrFallback) ShouldGetNewAddress(err error) (needsNewAddress bool, isConnectivityError bool) {
|
||||
switch err.(type) {
|
||||
case nil: // maintain current IP address
|
||||
// Try the next address if it was a quic.IdleTimeoutError or
|
||||
// dupConnRegisterTunnelError
|
||||
case *quic.IdleTimeoutError,
|
||||
connection.DupConnRegisterTunnelError,
|
||||
edgediscovery.DialError,
|
||||
*connection.EdgeQuicDialError:
|
||||
// Wait for two failures before falling back to a new address
|
||||
f.edgeErrors++
|
||||
if f.edgeErrors >= 2 {
|
||||
f.edgeErrors = 0
|
||||
return true, false
|
||||
}
|
||||
default: // maintain current IP address
|
||||
}
|
||||
return false, false
|
||||
}
|
||||
|
||||
// IPAddrFallback will have more conditions to fall back to a new address for certain
|
||||
// edge connection errors. This means that this handler will return true for isConnectivityError
|
||||
// for more cases like duplicate connection register and edge quic dial errors.
|
||||
type IPAddrFallback struct{}
|
||||
|
||||
func (f IPAddrFallback) ShouldGetNewAddress(err error) (needsNewAddress bool, isConnectivityError bool) {
|
||||
switch err.(type) {
|
||||
case nil: // maintain current IP address
|
||||
// Try the next address if it was a quic.IdleTimeoutError
|
||||
// DupConnRegisterTunnelError needs to also receive a new ip address
|
||||
case connection.DupConnRegisterTunnelError,
|
||||
*quic.IdleTimeoutError:
|
||||
return true, false
|
||||
// Network problems should be retried with new address immediately and report
|
||||
// as connectivity error
|
||||
case edgediscovery.DialError, *connection.EdgeQuicDialError:
|
||||
return true, true
|
||||
default: // maintain current IP address
|
||||
}
|
||||
return false, false
|
||||
}
|
||||
|
||||
type EdgeTunnelServer struct {
|
||||
config *TunnelConfig
|
||||
cloudflaredUUID uuid.UUID
|
||||
orchestrator *orchestration.Orchestrator
|
||||
credentialManager *reconnectCredentialManager
|
||||
edgeAddrHandler EdgeAddrHandler
|
||||
edgeAddrs *edgediscovery.Edge
|
||||
reconnectCh chan ReconnectSignal
|
||||
gracefulShutdownC <-chan struct{}
|
||||
|
||||
connAwareLogger *ConnAwareLogger
|
||||
}
|
||||
|
||||
func (e EdgeTunnelServer) Serve(ctx context.Context, connIndex uint8, connectedSignal *signal.Signal) error {
|
||||
haConnections.Inc()
|
||||
defer haConnections.Dec()
|
||||
|
||||
logger := config.Log.With().Uint8(connection.LogFieldConnIndex, connIndex).Logger()
|
||||
connLog := connAwareLogger.ReplaceLogger(&logger)
|
||||
|
||||
protocolFallback := &protocolFallback{
|
||||
retry.BackoffHandler{MaxRetries: config.Retries},
|
||||
config.ProtocolSelector.Current(),
|
||||
retry.BackoffHandler{MaxRetries: e.config.Retries},
|
||||
e.config.ProtocolSelector.Current(),
|
||||
false,
|
||||
}
|
||||
connectedFuse := h2mux.NewBooleanFuse()
|
||||
@@ -154,54 +210,81 @@ func ServeTunnelLoop(
|
||||
}()
|
||||
// Ensure the above goroutine will terminate if we return without connecting
|
||||
defer connectedFuse.Fuse(false)
|
||||
|
||||
// Fetch IP address to associated connection index
|
||||
addr, err := e.edgeAddrs.GetAddr(int(connIndex))
|
||||
switch err {
|
||||
case nil: // no error
|
||||
case edgediscovery.ErrNoAddressesLeft:
|
||||
return err
|
||||
default:
|
||||
return err
|
||||
}
|
||||
|
||||
logger := e.config.Log.With().
|
||||
IPAddr(connection.LogFieldIPAddress, addr.UDP.IP).
|
||||
Uint8(connection.LogFieldConnIndex, connIndex).
|
||||
Logger()
|
||||
connLog := e.connAwareLogger.ReplaceLogger(&logger)
|
||||
// Each connection to keep its own copy of protocol, because individual connections might fallback
|
||||
// to another protocol when a particular metal doesn't support new protocol
|
||||
for {
|
||||
err, recoverable := ServeTunnel(
|
||||
ctx,
|
||||
connLog,
|
||||
credentialManager,
|
||||
config,
|
||||
orchestrator,
|
||||
addr,
|
||||
connIndex,
|
||||
connectedFuse,
|
||||
protocolFallback,
|
||||
cloudflaredUUID,
|
||||
reconnectCh,
|
||||
protocolFallback.protocol,
|
||||
gracefulShutdownC,
|
||||
)
|
||||
// Each connection can also have it's own IP version because individual connections might fallback
|
||||
// to another IP version.
|
||||
err, recoverable := ServeTunnel(
|
||||
ctx,
|
||||
connLog,
|
||||
e.credentialManager,
|
||||
e.config,
|
||||
e.orchestrator,
|
||||
addr,
|
||||
connIndex,
|
||||
connectedFuse,
|
||||
protocolFallback,
|
||||
e.cloudflaredUUID,
|
||||
e.reconnectCh,
|
||||
protocolFallback.protocol,
|
||||
e.gracefulShutdownC,
|
||||
)
|
||||
|
||||
if recoverable {
|
||||
duration, ok := protocolFallback.GetMaxBackoffDuration(ctx)
|
||||
if !ok {
|
||||
return err
|
||||
}
|
||||
config.Observer.SendReconnect(connIndex)
|
||||
connLog.Logger().Info().Msgf("Retrying connection in up to %s seconds", duration)
|
||||
// If the connection is recoverable, we want to maintain the same IP
|
||||
// but backoff a reconnect with some duration.
|
||||
if recoverable {
|
||||
duration, ok := protocolFallback.GetMaxBackoffDuration(ctx)
|
||||
if !ok {
|
||||
return err
|
||||
}
|
||||
e.config.Observer.SendReconnect(connIndex)
|
||||
connLog.Logger().Info().Msgf("Retrying connection in up to %s seconds", duration)
|
||||
}
|
||||
|
||||
// Check if the connection error was from an IP issue with the host or
|
||||
// establishing a connection to the edge and if so, rotate the IP address.
|
||||
yes, hasConnectivityError := e.edgeAddrHandler.ShouldGetNewAddress(err)
|
||||
if yes {
|
||||
e.edgeAddrs.GetDifferentAddr(int(connIndex), hasConnectivityError)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-e.gracefulShutdownC:
|
||||
return nil
|
||||
case <-protocolFallback.BackoffTimer():
|
||||
if !recoverable {
|
||||
return err
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-gracefulShutdownC:
|
||||
return nil
|
||||
case <-protocolFallback.BackoffTimer():
|
||||
if !recoverable {
|
||||
return err
|
||||
}
|
||||
|
||||
if !selectNextProtocol(
|
||||
connLog.Logger(),
|
||||
protocolFallback,
|
||||
config.ProtocolSelector,
|
||||
err,
|
||||
) {
|
||||
return err
|
||||
}
|
||||
if !selectNextProtocol(
|
||||
connLog.Logger(),
|
||||
protocolFallback,
|
||||
e.config.ProtocolSelector,
|
||||
err,
|
||||
) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// protocolFallback is a wrapper around backoffHandler that will try fallback option when backoff reaches
|
||||
@@ -233,6 +316,10 @@ func selectNextProtocol(
|
||||
) bool {
|
||||
var idleTimeoutError *quic.IdleTimeoutError
|
||||
isNetworkActivityTimeout := errors.As(cause, &idleTimeoutError)
|
||||
edgeQuicDialError, ok := cause.(*connection.EdgeQuicDialError)
|
||||
if !isNetworkActivityTimeout && ok {
|
||||
isNetworkActivityTimeout = errors.As(edgeQuicDialError.Cause, &idleTimeoutError)
|
||||
}
|
||||
_, hasFallback := selector.Fallback()
|
||||
|
||||
if protocolBackoff.ReachedMaxRetries() || (hasFallback && isNetworkActivityTimeout) {
|
||||
@@ -241,7 +328,7 @@ func selectNextProtocol(
|
||||
"Cloudflare Network with `quic` protocol, then most likely your machine/network is getting its egress " +
|
||||
"UDP to port 7844 (or others) blocked or dropped. Make sure to allow egress connectivity as per " +
|
||||
"https://developers.cloudflare.com/cloudflare-one/connections/connect-apps/configuration/ports-and-ips/\n" +
|
||||
"If you are using private routing to this Tunnel, then UDP (and Private DNS Resolution) will not work" +
|
||||
"If you are using private routing to this Tunnel, then UDP (and Private DNS Resolution) will not work " +
|
||||
"unless your cloudflared can connect with Cloudflare Network with `quic`.")
|
||||
}
|
||||
|
||||
@@ -326,8 +413,12 @@ func ServeTunnel(
|
||||
connLog.ConnAwareLogger().Msg(activeIncidentsMsg(incidents))
|
||||
}
|
||||
return err.Cause, !err.Permanent
|
||||
case *connection.EdgeQuicDialError:
|
||||
// Don't retry connection for a dial error
|
||||
return err, false
|
||||
case ReconnectSignal:
|
||||
connLog.Logger().Info().
|
||||
IPAddr(connection.LogFieldIPAddress, addr.UDP.IP).
|
||||
Uint8(connection.LogFieldConnIndex, connIndex).
|
||||
Msgf("Restarting connection due to reconnect signal in %s", err.Delay)
|
||||
err.DelayBeforeReconnect()
|
||||
@@ -526,6 +617,7 @@ func ServeHTTP2(
|
||||
err := listenReconnect(serveCtx, reconnectCh, gracefulShutdownC)
|
||||
if err != nil {
|
||||
// forcefully break the connection (this is only used for testing)
|
||||
connLog.Logger().Debug().Msg("Forcefully breaking http2 connection")
|
||||
_ = tlsServerConn.Close()
|
||||
}
|
||||
return err
|
||||
@@ -584,6 +676,7 @@ func ServeQUIC(
|
||||
err := listenReconnect(serveCtx, reconnectCh, gracefulShutdownC)
|
||||
if err != nil {
|
||||
// forcefully break the connection (this is only used for testing)
|
||||
connLogger.Logger().Debug().Msg("Forcefully breaking quic connection")
|
||||
quicConn.Close()
|
||||
}
|
||||
return err
|
||||
|
Reference in New Issue
Block a user