mirror of
https://github.com/cloudflare/cloudflared.git
synced 2025-07-27 15:49:58 +00:00
TUN-6007: Implement new edge discovery algorithm
This commit is contained in:
@@ -7,7 +7,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/lucas-clemente/quic-go"
|
||||
"github.com/rs/zerolog"
|
||||
|
||||
"github.com/cloudflare/cloudflared/connection"
|
||||
@@ -42,6 +41,7 @@ type Supervisor struct {
|
||||
config *TunnelConfig
|
||||
orchestrator *orchestration.Orchestrator
|
||||
edgeIPs *edgediscovery.Edge
|
||||
edgeTunnelServer EdgeTunnelServer
|
||||
tunnelErrors chan tunnelError
|
||||
tunnelsConnecting map[int]chan struct{}
|
||||
// nextConnectedIndex and nextConnectedSignal are used to wait for all
|
||||
@@ -76,12 +76,34 @@ func NewSupervisor(config *TunnelConfig, orchestrator *orchestration.Orchestrato
|
||||
if len(config.EdgeAddrs) > 0 {
|
||||
edgeIPs, err = edgediscovery.StaticEdge(config.Log, config.EdgeAddrs)
|
||||
} else {
|
||||
edgeIPs, err = edgediscovery.ResolveEdge(config.Log, config.Region)
|
||||
edgeIPs, err = edgediscovery.ResolveEdge(config.Log, config.Region, config.EdgeIPVersion)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
reconnectCredentialManager := newReconnectCredentialManager(connection.MetricsNamespace, connection.TunnelSubsystem, config.HAConnections)
|
||||
log := NewConnAwareLogger(config.Log, config.Observer)
|
||||
|
||||
var edgeAddrHandler EdgeAddrHandler
|
||||
if config.EdgeIPVersion == allregions.IPv6Only || config.EdgeIPVersion == allregions.Auto {
|
||||
edgeAddrHandler = &IPAddrFallback{}
|
||||
} else { // IPv4Only
|
||||
edgeAddrHandler = &DefaultAddrFallback{}
|
||||
}
|
||||
|
||||
edgeTunnelServer := EdgeTunnelServer{
|
||||
config: config,
|
||||
cloudflaredUUID: cloudflaredUUID,
|
||||
orchestrator: orchestrator,
|
||||
credentialManager: reconnectCredentialManager,
|
||||
edgeAddrs: edgeIPs,
|
||||
edgeAddrHandler: edgeAddrHandler,
|
||||
reconnectCh: reconnectCh,
|
||||
gracefulShutdownC: gracefulShutdownC,
|
||||
connAwareLogger: log,
|
||||
}
|
||||
|
||||
useReconnectToken := false
|
||||
if config.ClassicTunnel != nil {
|
||||
useReconnectToken = config.ClassicTunnel.UseReconnectToken
|
||||
@@ -92,11 +114,12 @@ func NewSupervisor(config *TunnelConfig, orchestrator *orchestration.Orchestrato
|
||||
config: config,
|
||||
orchestrator: orchestrator,
|
||||
edgeIPs: edgeIPs,
|
||||
edgeTunnelServer: edgeTunnelServer,
|
||||
tunnelErrors: make(chan tunnelError),
|
||||
tunnelsConnecting: map[int]chan struct{}{},
|
||||
log: NewConnAwareLogger(config.Log, config.Observer),
|
||||
log: log,
|
||||
logTransport: config.LogTransport,
|
||||
reconnectCredentialManager: newReconnectCredentialManager(connection.MetricsNamespace, connection.TunnelSubsystem, config.HAConnections),
|
||||
reconnectCredentialManager: reconnectCredentialManager,
|
||||
useReconnectToken: useReconnectToken,
|
||||
reconnectCh: reconnectCh,
|
||||
gracefulShutdownC: gracefulShutdownC,
|
||||
@@ -143,11 +166,18 @@ func (s *Supervisor) Run(
|
||||
tunnelsActive--
|
||||
}
|
||||
return nil
|
||||
// startTunnel returned with error
|
||||
// startTunnel completed with a response
|
||||
// (note that this may also be caused by context cancellation)
|
||||
case tunnelError := <-s.tunnelErrors:
|
||||
tunnelsActive--
|
||||
if tunnelError.err != nil && !shuttingDown {
|
||||
switch tunnelError.err.(type) {
|
||||
case ReconnectSignal:
|
||||
// For tunnels that closed with reconnect signal, we reconnect immediately
|
||||
go s.startTunnel(ctx, tunnelError.index, s.newConnectedTunnelSignal(tunnelError.index))
|
||||
tunnelsActive++
|
||||
continue
|
||||
}
|
||||
s.log.ConnAwareLogger().Err(tunnelError.err).Int(connection.LogFieldConnIndex, tunnelError.index).Msg("Connection terminated")
|
||||
tunnelsWaiting = append(tunnelsWaiting, tunnelError.index)
|
||||
s.waitForNextTunnel(tunnelError.index)
|
||||
@@ -155,10 +185,9 @@ func (s *Supervisor) Run(
|
||||
if backoffTimer == nil {
|
||||
backoffTimer = backoff.BackoffTimer()
|
||||
}
|
||||
|
||||
// Previously we'd mark the edge address as bad here, but now we'll just silently use another.
|
||||
} else if tunnelsActive == 0 {
|
||||
// all connected tunnels exited gracefully, no more work to do
|
||||
s.log.ConnAwareLogger().Msg("no more connections active and exiting")
|
||||
// All connected tunnels exited gracefully, no more work to do
|
||||
return nil
|
||||
}
|
||||
// Backoff was set and its timer expired
|
||||
@@ -192,6 +221,8 @@ func (s *Supervisor) Run(
|
||||
}
|
||||
|
||||
// Returns nil if initialization succeeded, else the initialization error.
|
||||
// Attempts here will be made to connect one tunnel, if successful, it will
|
||||
// connect the available tunnels up to config.HAConnections.
|
||||
func (s *Supervisor) initialize(
|
||||
ctx context.Context,
|
||||
connectedSignal *signal.Signal,
|
||||
@@ -203,6 +234,8 @@ func (s *Supervisor) initialize(
|
||||
}
|
||||
|
||||
go s.startFirstTunnel(ctx, connectedSignal)
|
||||
|
||||
// Wait for response from first tunnel before proceeding to attempt other HA edge tunnels
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
<-s.tunnelErrors
|
||||
@@ -213,6 +246,7 @@ func (s *Supervisor) initialize(
|
||||
return errEarlyShutdown
|
||||
case <-connectedSignal.Wait():
|
||||
}
|
||||
|
||||
// At least one successful connection, so start the rest
|
||||
for i := 1; i < s.config.HAConnections; i++ {
|
||||
ch := signal.New(make(chan struct{}))
|
||||
@@ -229,102 +263,42 @@ func (s *Supervisor) startFirstTunnel(
|
||||
connectedSignal *signal.Signal,
|
||||
) {
|
||||
var (
|
||||
addr *allregions.EdgeAddr
|
||||
err error
|
||||
err error
|
||||
)
|
||||
const firstConnIndex = 0
|
||||
defer func() {
|
||||
s.tunnelErrors <- tunnelError{index: firstConnIndex, err: err}
|
||||
}()
|
||||
|
||||
addr, err = s.edgeIPs.GetAddr(firstConnIndex)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = s.edgeTunnelServer.Serve(ctx, firstConnIndex, connectedSignal)
|
||||
|
||||
err = ServeTunnelLoop(
|
||||
ctx,
|
||||
s.reconnectCredentialManager,
|
||||
s.config,
|
||||
s.orchestrator,
|
||||
addr,
|
||||
s.log,
|
||||
firstConnIndex,
|
||||
connectedSignal,
|
||||
s.cloudflaredUUID,
|
||||
s.reconnectCh,
|
||||
s.gracefulShutdownC,
|
||||
)
|
||||
// If the first tunnel disconnects, keep restarting it.
|
||||
edgeErrors := 0
|
||||
for s.unusedIPs() {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
switch err.(type) {
|
||||
case nil:
|
||||
return
|
||||
// try the next address if it was a quic.IdleTimeoutError, dialError(network problem) or
|
||||
// dupConnRegisterTunnelError
|
||||
case *quic.IdleTimeoutError, edgediscovery.DialError, connection.DupConnRegisterTunnelError:
|
||||
edgeErrors++
|
||||
default:
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
if edgeErrors >= 2 {
|
||||
addr, err = s.edgeIPs.GetDifferentAddr(firstConnIndex)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
err = ServeTunnelLoop(
|
||||
ctx,
|
||||
s.reconnectCredentialManager,
|
||||
s.config,
|
||||
s.orchestrator,
|
||||
addr,
|
||||
s.log,
|
||||
firstConnIndex,
|
||||
connectedSignal,
|
||||
s.cloudflaredUUID,
|
||||
s.reconnectCh,
|
||||
s.gracefulShutdownC,
|
||||
)
|
||||
err = s.edgeTunnelServer.Serve(ctx, firstConnIndex, connectedSignal)
|
||||
}
|
||||
}
|
||||
|
||||
// startTunnel starts a new tunnel connection. The resulting error will be sent on
|
||||
// s.tunnelErrors.
|
||||
// s.tunnelError as this is expected to run in a goroutine.
|
||||
func (s *Supervisor) startTunnel(
|
||||
ctx context.Context,
|
||||
index int,
|
||||
connectedSignal *signal.Signal,
|
||||
) {
|
||||
var (
|
||||
addr *allregions.EdgeAddr
|
||||
err error
|
||||
err error
|
||||
)
|
||||
defer func() {
|
||||
s.tunnelErrors <- tunnelError{index: index, err: err}
|
||||
}()
|
||||
|
||||
addr, err = s.edgeIPs.GetDifferentAddr(index)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = ServeTunnelLoop(
|
||||
ctx,
|
||||
s.reconnectCredentialManager,
|
||||
s.config,
|
||||
s.orchestrator,
|
||||
addr,
|
||||
s.log,
|
||||
uint8(index),
|
||||
connectedSignal,
|
||||
s.cloudflaredUUID,
|
||||
s.reconnectCh,
|
||||
s.gracefulShutdownC,
|
||||
)
|
||||
err = s.edgeTunnelServer.Serve(ctx, uint8(index), connectedSignal)
|
||||
}
|
||||
|
||||
func (s *Supervisor) newConnectedTunnelSignal(index int) *signal.Signal {
|
||||
|
Reference in New Issue
Block a user