TUN-5749: Refactor cloudflared to pave way for reconfigurable ingress

- Split origin into supervisor and proxy packages
- Create configManager to handle dynamic config
This commit is contained in:
cthuang
2022-02-07 09:42:07 +00:00
parent ff4cfeda0c
commit e22422aafb
33 changed files with 317 additions and 220 deletions

View File

@@ -0,0 +1,117 @@
package supervisor
import (
"encoding/json"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/cloudflare/golibs/lrucache"
)
// StatusPage.io API docs:
// https://www.cloudflarestatus.com/api/v2/#incidents-unresolved
const (
activeIncidentsURL = "https://yh6f0r4529hb.statuspage.io/api/v2/incidents/unresolved.json"
argoTunnelKeyword = "argo tunnel"
incidentDetailsPrefix = "https://www.cloudflarestatus.com/incidents/"
)
// IncidentLookup is an object that checks for active incidents in
// the Cloudflare infrastructure.
type IncidentLookup interface {
ActiveIncidents() []Incident
}
// NewIncidentLookup returns a new IncidentLookup instance that caches its
// results with a 1-minute TTL.
func NewIncidentLookup() IncidentLookup {
return newCachedIncidentLookup(fetchActiveIncidents)
}
type IncidentUpdate struct {
Body string
}
type Incident struct {
Name string
ID string `json:"id"`
Updates []IncidentUpdate `json:"incident_updates"`
}
type StatusPage struct {
Incidents []Incident
}
func (i Incident) URL() string {
return incidentDetailsPrefix + i.ID
}
func parseStatusPage(data []byte) (*StatusPage, error) {
var result StatusPage
err := json.Unmarshal(data, &result)
return &result, err
}
func isArgoTunnelIncident(i Incident) bool {
if strings.Contains(strings.ToLower(i.Name), argoTunnelKeyword) {
return true
}
for _, u := range i.Updates {
if strings.Contains(strings.ToLower(u.Body), argoTunnelKeyword) {
return true
}
}
return false
}
func fetchActiveIncidents() (incidents []Incident) {
resp, err := http.Get(activeIncidentsURL)
if err != nil {
return
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return
}
statusPage, err := parseStatusPage(body)
if err != nil {
return
}
for _, i := range statusPage.Incidents {
if isArgoTunnelIncident(i) {
incidents = append(incidents, i)
}
}
return incidents
}
type cachedIncidentLookup struct {
cache *lrucache.LRUCache
ttl time.Duration
uncachedLookup func() []Incident
}
func newCachedIncidentLookup(uncachedLookup func() []Incident) *cachedIncidentLookup {
return &cachedIncidentLookup{
cache: lrucache.NewLRUCache(1),
ttl: time.Minute,
uncachedLookup: uncachedLookup,
}
}
// We only need one cache entry. Always use the empty string as its key.
const cacheKey = ""
func (c *cachedIncidentLookup) ActiveIncidents() []Incident {
if cached, ok := c.cache.GetNotStale(cacheKey); ok {
if incidents, ok := cached.([]Incident); ok {
return incidents
}
}
incidents := c.uncachedLookup()
c.cache.Set(cacheKey, incidents, time.Now().Add(c.ttl))
return incidents
}

View File

@@ -0,0 +1,384 @@
package supervisor
import (
"testing"
"time"
"github.com/cloudflare/golibs/lrucache"
"github.com/stretchr/testify/assert"
)
func TestParseStatusPage(t *testing.T) {
testCases := []struct {
input []byte
output *StatusPage
fail bool
}{
{
input: []byte(`<html>
<head><title>504 Gateway Time-out</title></head>
<body><center><h1>504 Gateway Time-out</h1></center></body>
</html>`),
output: nil,
fail: true,
},
{
input: []byte(`{
"page": {
"id": "yh6f0r4529hb",
"name": "Cloudflare",
"url": "https://www.cloudflarestatus.com",
"time_zone": "Etc/UTC",
"updated_at": "2019-01-10T20:11:38.750Z"
},
"incidents": [
{
"name": "Cloudflare API service issues",
"status": "resolved",
"created_at": "2018-09-17T19:29:21.132Z",
"updated_at": "2018-09-18T07:45:41.313Z",
"monitoring_at": "2018-09-17T21:35:06.492Z",
"resolved_at": "2018-09-18T07:45:41.290Z",
"shortlink": "http://stspg.io/7f079791e",
"id": "q746ybtyb6q0",
"page_id": "yh6f0r4529hb",
"incident_updates": [
{
"status": "resolved",
"body": "Cloudflare has resolved the issue and the service have resumed normal operation.",
"created_at": "2018-09-18T07:45:41.290Z",
"updated_at": "2018-09-18T07:45:41.290Z",
"display_at": "2018-09-18T07:45:41.290Z",
"affected_components": [
{
"code": "g4tb35rs9yw7",
"name": "Cloudflare customer dashboard and APIs - Cloudflare APIs",
"old_status": "operational",
"new_status": "operational"
}
],
"deliver_notifications": true,
"tweet_id": null,
"id": "zl5g2pl5zhfs",
"incident_id": "q746ybtyb6q0",
"custom_tweet": null
},
{
"status": "monitoring",
"body": "Cloudflare has implemented a fix for this issue and is currently monitoring the results.\r\n\r\nWe will update the status once the issue is resolved.",
"created_at": "2018-09-17T21:35:06.492Z",
"updated_at": "2018-09-17T21:35:06.492Z",
"display_at": "2018-09-17T21:35:06.492Z",
"affected_components": [
{
"code": "g4tb35rs9yw7",
"name": "Cloudflare customer dashboard and APIs - Cloudflare APIs",
"old_status": "degraded_performance",
"new_status": "operational"
}
],
"deliver_notifications": false,
"tweet_id": null,
"id": "0001sv3chdnx",
"incident_id": "q746ybtyb6q0",
"custom_tweet": null
},
{
"status": "investigating",
"body": "We are continuing to investigate this issue.",
"created_at": "2018-09-17T19:30:08.049Z",
"updated_at": "2018-09-17T19:30:08.049Z",
"display_at": "2018-09-17T19:30:08.049Z",
"affected_components": [
{
"code": "g4tb35rs9yw7",
"name": "Cloudflare customer dashboard and APIs - Cloudflare APIs",
"old_status": "operational",
"new_status": "degraded_performance"
}
],
"deliver_notifications": false,
"tweet_id": null,
"id": "qdr164tfpq7m",
"incident_id": "q746ybtyb6q0",
"custom_tweet": null
},
{
"status": "investigating",
"body": "Cloudflare is investigating issues with APIs and Page Rule delays for Page Rule updates. Cloudflare Page Rule service delivery is unaffected and is operating normally. Also, these issues do not affect the Cloudflare CDN and therefore, do not impact customer websites.",
"created_at": "2018-09-17T19:29:21.201Z",
"updated_at": "2018-09-17T19:29:21.201Z",
"display_at": "2018-09-17T19:29:21.201Z",
"affected_components": [
{
"code": "g4tb35rs9yw7",
"name": "Cloudflare customer dashboard and APIs - Cloudflare APIs",
"old_status": "operational",
"new_status": "operational"
}
],
"deliver_notifications": false,
"tweet_id": null,
"id": "qzl2n0q8tskg",
"incident_id": "q746ybtyb6q0",
"custom_tweet": null
}
],
"components": [
{
"status": "operational",
"name": "Cloudflare APIs",
"created_at": "2014-10-09T03:32:07.158Z",
"updated_at": "2019-01-01T22:58:30.846Z",
"position": 2,
"description": null,
"showcase": false,
"id": "g4tb35rs9yw7",
"page_id": "yh6f0r4529hb",
"group_id": "1km35smx8p41",
"group": false,
"only_show_if_degraded": false
}
],
"impact": "minor"
},
{
"name": "Web Analytics Delays",
"status": "resolved",
"created_at": "2018-09-17T18:05:39.907Z",
"updated_at": "2018-09-17T22:53:05.078Z",
"monitoring_at": null,
"resolved_at": "2018-09-17T22:53:05.057Z",
"shortlink": "http://stspg.io/cb208928c",
"id": "wqfk9mzs5qt1",
"page_id": "yh6f0r4529hb",
"incident_updates": [
{
"status": "resolved",
"body": "Cloudflare has resolved the issue and Web Analytics have resumed normal operation.",
"created_at": "2018-09-17T22:53:05.057Z",
"updated_at": "2018-09-17T22:53:05.057Z",
"display_at": "2018-09-17T22:53:05.057Z",
"affected_components": [
{
"code": "4c231tkdlpcl",
"name": "Cloudflare customer dashboard and APIs - Analytics",
"old_status": "degraded_performance",
"new_status": "operational"
}
],
"deliver_notifications": false,
"tweet_id": null,
"id": "93y1w00yqzk4",
"incident_id": "wqfk9mzs5qt1",
"custom_tweet": null
},
{
"status": "investigating",
"body": "There is a delay in processing Cloudflare Web Analytics. This affects timely delivery of customer data.\n\nThese delays do not impact analytics for DNS and Rate Limiting.",
"created_at": "2018-09-17T18:05:40.033Z",
"updated_at": "2018-09-17T18:05:40.033Z",
"display_at": "2018-09-17T18:05:40.033Z",
"affected_components": [
{
"code": "4c231tkdlpcl",
"name": "Cloudflare customer dashboard and APIs - Analytics",
"old_status": "operational",
"new_status": "degraded_performance"
}
],
"deliver_notifications": false,
"tweet_id": null,
"id": "362t6lv0vrpk",
"incident_id": "wqfk9mzs5qt1",
"custom_tweet": null
}
],
"components": [
{
"status": "operational",
"name": "Analytics",
"created_at": "2014-11-13T11:54:10.191Z",
"updated_at": "2018-12-31T08:20:52.349Z",
"position": 3,
"description": "Customer data",
"showcase": false,
"id": "4c231tkdlpcl",
"page_id": "yh6f0r4529hb",
"group_id": "1km35smx8p41",
"group": false,
"only_show_if_degraded": false
}
],
"impact": "minor"
}
]
}`),
output: &StatusPage{
Incidents: []Incident{
Incident{
Name: "Cloudflare API service issues",
ID: "q746ybtyb6q0",
Updates: []IncidentUpdate{
IncidentUpdate{
Body: "Cloudflare has resolved the issue and the service have resumed normal operation.",
},
IncidentUpdate{
Body: "Cloudflare has implemented a fix for this issue and is currently monitoring the results.\r\n\r\nWe will update the status once the issue is resolved.",
},
IncidentUpdate{
Body: "We are continuing to investigate this issue.",
},
IncidentUpdate{
Body: "Cloudflare is investigating issues with APIs and Page Rule delays for Page Rule updates. Cloudflare Page Rule service delivery is unaffected and is operating normally. Also, these issues do not affect the Cloudflare CDN and therefore, do not impact customer websites.",
},
},
},
Incident{
Name: "Web Analytics Delays",
ID: "wqfk9mzs5qt1",
Updates: []IncidentUpdate{
IncidentUpdate{
Body: "Cloudflare has resolved the issue and Web Analytics have resumed normal operation.",
},
IncidentUpdate{
Body: "There is a delay in processing Cloudflare Web Analytics. This affects timely delivery of customer data.\n\nThese delays do not impact analytics for DNS and Rate Limiting.",
},
},
},
},
},
fail: false,
},
}
for _, testCase := range testCases {
output, err := parseStatusPage(testCase.input)
if testCase.fail {
assert.Error(t, err)
} else {
assert.Nil(t, err)
assert.Equal(t, testCase.output, output)
}
}
}
func TestIsArgoTunnelIncident(t *testing.T) {
testCases := []struct {
input Incident
output bool
}{
{
input: Incident{},
output: false,
},
{
input: Incident{Name: "An Argo Tunnel incident"},
output: true,
},
{
input: Incident{Name: "an argo tunnel incident"},
output: true,
},
{
input: Incident{Name: "an aRgO TuNnEl incident"},
output: true,
},
{
input: Incident{Name: "an argotunnel incident"},
output: false,
},
{
input: Incident{Name: "irrelevant"},
output: false,
},
{
input: Incident{
Name: "irrelevant",
Updates: []IncidentUpdate{
IncidentUpdate{Body: "irrelevant"},
IncidentUpdate{Body: "an Argo Tunnel incident"},
IncidentUpdate{Body: "irrelevant"},
},
},
output: true,
},
{
input: Incident{
Name: "an Argo Tunnel incident",
Updates: []IncidentUpdate{
IncidentUpdate{Body: "irrelevant"},
IncidentUpdate{Body: "irrelevant"},
IncidentUpdate{Body: "irrelevant"},
},
},
output: true,
},
}
for _, testCase := range testCases {
actual := isArgoTunnelIncident(testCase.input)
assert.Equal(t, testCase.output, actual, "Test case failed: %v", testCase.input)
}
}
func TestIncidentURL(t *testing.T) {
incident := Incident{
ID: "s6k0dnn5347b",
}
assert.Equal(t, "https://www.cloudflarestatus.com/incidents/s6k0dnn5347b", incident.URL())
}
func TestNewCachedIncidentLookup(t *testing.T) {
c := newCachedIncidentLookup(func() []Incident { return nil })
assert.Equal(t, time.Minute, c.ttl)
assert.Equal(t, 1, c.cache.Capacity())
}
func TestCachedIncidentLookup(t *testing.T) {
expected := []Incident{
Incident{
Name: "An incident",
ID: "incidentID",
},
}
var shouldCallUncachedLookup bool
c := &cachedIncidentLookup{
cache: lrucache.NewLRUCache(1),
ttl: 50 * time.Millisecond,
uncachedLookup: func() []Incident {
if !shouldCallUncachedLookup {
t.Fatal("uncachedLookup shouldn't have been called")
}
return expected
},
}
shouldCallUncachedLookup = true
assert.Equal(t, expected, c.ActiveIncidents())
shouldCallUncachedLookup = false
assert.Equal(t, expected, c.ActiveIncidents())
assert.Equal(t, expected, c.ActiveIncidents())
time.Sleep(50 * time.Millisecond)
shouldCallUncachedLookup = true
assert.Equal(t, expected, c.ActiveIncidents())
}
func TestCachedIncidentLookupDoesntPanic(t *testing.T) {
expected := []Incident{
Incident{
Name: "An incident",
ID: "incidentID",
},
}
c := &cachedIncidentLookup{
cache: lrucache.NewLRUCache(1),
ttl: 50 * time.Millisecond,
uncachedLookup: func() []Incident { return expected },
}
c.cache.Set(cacheKey, 42, time.Now().Add(30*time.Minute))
actual := c.ActiveIncidents()
assert.Equal(t, expected, actual)
}

View File

@@ -0,0 +1,55 @@
package supervisor
import (
"sync"
"github.com/rs/zerolog"
"github.com/cloudflare/cloudflared/connection"
"github.com/cloudflare/cloudflared/ingress"
"github.com/cloudflare/cloudflared/proxy"
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
)
type configManager struct {
currentVersion int32
// Only used by UpdateConfig
updateLock sync.Mutex
// TODO: TUN-5698: Make proxy atomic.Value
proxy *proxy.Proxy
config *DynamicConfig
tags []tunnelpogs.Tag
log *zerolog.Logger
}
func newConfigManager(config *DynamicConfig, tags []tunnelpogs.Tag, log *zerolog.Logger) *configManager {
var warpRoutingService *ingress.WarpRoutingService
if config.WarpRoutingEnabled {
warpRoutingService = ingress.NewWarpRoutingService()
log.Info().Msgf("Warp-routing is enabled")
}
return &configManager{
// Lowest possible version, any remote configuration will have version higher than this
currentVersion: 0,
proxy: proxy.NewOriginProxy(config.Ingress, warpRoutingService, tags, log),
config: config,
log: log,
}
}
func (cm *configManager) Update(version int32, config []byte) *tunnelpogs.UpdateConfigurationResponse {
// TODO: TUN-5698: make ingress configurable
return &tunnelpogs.UpdateConfigurationResponse{
LastAppliedVersion: cm.currentVersion,
}
}
func (cm *configManager) GetOriginProxy() connection.OriginProxy {
return cm.proxy
}
type DynamicConfig struct {
Ingress *ingress.Ingress
WarpRoutingEnabled bool
}

View File

@@ -0,0 +1,42 @@
package supervisor
import (
"github.com/rs/zerolog"
"github.com/cloudflare/cloudflared/connection"
"github.com/cloudflare/cloudflared/tunnelstate"
)
type ConnAwareLogger struct {
tracker *tunnelstate.ConnTracker
logger *zerolog.Logger
}
func NewConnAwareLogger(logger *zerolog.Logger, observer *connection.Observer) *ConnAwareLogger {
connAwareLogger := &ConnAwareLogger{
tracker: tunnelstate.NewConnTracker(logger),
logger: logger,
}
observer.RegisterSink(connAwareLogger.tracker)
return connAwareLogger
}
func (c *ConnAwareLogger) ReplaceLogger(logger *zerolog.Logger) *ConnAwareLogger {
return &ConnAwareLogger{
tracker: c.tracker,
logger: logger,
}
}
func (c *ConnAwareLogger) ConnAwareLogger() *zerolog.Event {
if c.tracker.CountActiveConns() == 0 {
return c.logger.Error()
}
return c.logger.Warn()
}
func (c *ConnAwareLogger) Logger() *zerolog.Logger {
return c.logger
}

View File

@@ -0,0 +1,21 @@
package supervisor
import (
"time"
)
type ReconnectSignal struct {
// wait this many seconds before re-establish the connection
Delay time.Duration
}
// Error allows us to use ReconnectSignal as a special error to force connection abort
func (r ReconnectSignal) Error() string {
return "reconnect signal"
}
func (r ReconnectSignal) DelayBeforeReconnect() {
if r.Delay > 0 {
time.Sleep(r.Delay)
}
}

27
supervisor/metrics.go Normal file
View File

@@ -0,0 +1,27 @@
package supervisor
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/cloudflare/cloudflared/connection"
)
// Metrics uses connection.MetricsNamespace(aka cloudflared) as namespace and connection.TunnelSubsystem
// (tunnel) as subsystem to keep them consistent with the previous qualifier.
var (
haConnections = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: connection.MetricsNamespace,
Subsystem: connection.TunnelSubsystem,
Name: "ha_connections",
Help: "Number of active ha connections",
},
)
)
func init() {
prometheus.MustRegister(
haConnections,
)
}

138
supervisor/reconnect.go Normal file
View File

@@ -0,0 +1,138 @@
package supervisor
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/cloudflare/cloudflared/retry"
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
)
var (
errJWTUnset = errors.New("JWT unset")
)
// reconnectTunnelCredentialManager is invoked by functions in tunnel.go to
// get/set parameters for ReconnectTunnel RPC calls.
type reconnectCredentialManager struct {
mu sync.RWMutex
jwt []byte
eventDigest map[uint8][]byte
connDigest map[uint8][]byte
authSuccess prometheus.Counter
authFail *prometheus.CounterVec
}
func newReconnectCredentialManager(namespace, subsystem string, haConnections int) *reconnectCredentialManager {
authSuccess := prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "tunnel_authenticate_success",
Help: "Count of successful tunnel authenticate",
},
)
authFail := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "tunnel_authenticate_fail",
Help: "Count of tunnel authenticate errors by type",
},
[]string{"error"},
)
prometheus.MustRegister(authSuccess, authFail)
return &reconnectCredentialManager{
eventDigest: make(map[uint8][]byte, haConnections),
connDigest: make(map[uint8][]byte, haConnections),
authSuccess: authSuccess,
authFail: authFail,
}
}
func (cm *reconnectCredentialManager) ReconnectToken() ([]byte, error) {
cm.mu.RLock()
defer cm.mu.RUnlock()
if cm.jwt == nil {
return nil, errJWTUnset
}
return cm.jwt, nil
}
func (cm *reconnectCredentialManager) SetReconnectToken(jwt []byte) {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.jwt = jwt
}
func (cm *reconnectCredentialManager) EventDigest(connID uint8) ([]byte, error) {
cm.mu.RLock()
defer cm.mu.RUnlock()
digest, ok := cm.eventDigest[connID]
if !ok {
return nil, fmt.Errorf("no event digest for connection %v", connID)
}
return digest, nil
}
func (cm *reconnectCredentialManager) SetEventDigest(connID uint8, digest []byte) {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.eventDigest[connID] = digest
}
func (cm *reconnectCredentialManager) ConnDigest(connID uint8) ([]byte, error) {
cm.mu.RLock()
defer cm.mu.RUnlock()
digest, ok := cm.connDigest[connID]
if !ok {
return nil, fmt.Errorf("no connection digest for connection %v", connID)
}
return digest, nil
}
func (cm *reconnectCredentialManager) SetConnDigest(connID uint8, digest []byte) {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.connDigest[connID] = digest
}
func (cm *reconnectCredentialManager) RefreshAuth(
ctx context.Context,
backoff *retry.BackoffHandler,
authenticate func(ctx context.Context, numPreviousAttempts int) (tunnelpogs.AuthOutcome, error),
) (retryTimer <-chan time.Time, err error) {
authOutcome, err := authenticate(ctx, backoff.Retries())
if err != nil {
cm.authFail.WithLabelValues(err.Error()).Inc()
if _, ok := backoff.GetMaxBackoffDuration(ctx); ok {
return backoff.BackoffTimer(), nil
}
return nil, err
}
// clear backoff timer
backoff.SetGracePeriod()
switch outcome := authOutcome.(type) {
case tunnelpogs.AuthSuccess:
cm.SetReconnectToken(outcome.JWT())
cm.authSuccess.Inc()
return retry.Clock.After(outcome.RefreshAfter()), nil
case tunnelpogs.AuthUnknown:
duration := outcome.RefreshAfter()
cm.authFail.WithLabelValues(outcome.Error()).Inc()
return retry.Clock.After(duration), nil
case tunnelpogs.AuthFail:
cm.authFail.WithLabelValues(outcome.Error()).Inc()
return nil, outcome
default:
err := fmt.Errorf("refresh_auth: Unexpected outcome type %T", authOutcome)
cm.authFail.WithLabelValues(err.Error()).Inc()
return nil, err
}
}

View File

@@ -0,0 +1,120 @@
package supervisor
import (
"context"
"errors"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/cloudflare/cloudflared/retry"
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
)
func TestRefreshAuthBackoff(t *testing.T) {
rcm := newReconnectCredentialManager(t.Name(), t.Name(), 4)
var wait time.Duration
retry.Clock.After = func(d time.Duration) <-chan time.Time {
wait = d
return time.After(d)
}
backoff := &retry.BackoffHandler{MaxRetries: 3}
auth := func(ctx context.Context, n int) (tunnelpogs.AuthOutcome, error) {
return nil, fmt.Errorf("authentication failure")
}
// authentication failures should consume the backoff
for i := uint(0); i < backoff.MaxRetries; i++ {
retryChan, err := rcm.RefreshAuth(context.Background(), backoff, auth)
require.NoError(t, err)
require.NotNil(t, retryChan)
require.Greater(t, wait.Seconds(), 0.0)
require.Less(t, wait.Seconds(), float64((1<<(i+1))*time.Second))
}
retryChan, err := rcm.RefreshAuth(context.Background(), backoff, auth)
require.Error(t, err)
require.Nil(t, retryChan)
// now we actually make contact with the remote server
_, _ = rcm.RefreshAuth(context.Background(), backoff, func(ctx context.Context, n int) (tunnelpogs.AuthOutcome, error) {
return tunnelpogs.NewAuthUnknown(errors.New("auth unknown"), 19), nil
})
// The backoff timer should have been reset. To confirm this, make timeNow
// return a value after the backoff timer's grace period
retry.Clock.Now = func() time.Time {
expectedGracePeriod := time.Duration(time.Second * 2 << backoff.MaxRetries)
return time.Now().Add(expectedGracePeriod * 2)
}
_, ok := backoff.GetMaxBackoffDuration(context.Background())
require.True(t, ok)
}
func TestRefreshAuthSuccess(t *testing.T) {
rcm := newReconnectCredentialManager(t.Name(), t.Name(), 4)
var wait time.Duration
retry.Clock.After = func(d time.Duration) <-chan time.Time {
wait = d
return time.After(d)
}
backoff := &retry.BackoffHandler{MaxRetries: 3}
auth := func(ctx context.Context, n int) (tunnelpogs.AuthOutcome, error) {
return tunnelpogs.NewAuthSuccess([]byte("jwt"), 19), nil
}
retryChan, err := rcm.RefreshAuth(context.Background(), backoff, auth)
assert.NoError(t, err)
assert.NotNil(t, retryChan)
assert.Equal(t, 19*time.Hour, wait)
token, err := rcm.ReconnectToken()
assert.NoError(t, err)
assert.Equal(t, []byte("jwt"), token)
}
func TestRefreshAuthUnknown(t *testing.T) {
rcm := newReconnectCredentialManager(t.Name(), t.Name(), 4)
var wait time.Duration
retry.Clock.After = func(d time.Duration) <-chan time.Time {
wait = d
return time.After(d)
}
backoff := &retry.BackoffHandler{MaxRetries: 3}
auth := func(ctx context.Context, n int) (tunnelpogs.AuthOutcome, error) {
return tunnelpogs.NewAuthUnknown(errors.New("auth unknown"), 19), nil
}
retryChan, err := rcm.RefreshAuth(context.Background(), backoff, auth)
assert.NoError(t, err)
assert.NotNil(t, retryChan)
assert.Equal(t, 19*time.Hour, wait)
token, err := rcm.ReconnectToken()
assert.Equal(t, errJWTUnset, err)
assert.Nil(t, token)
}
func TestRefreshAuthFail(t *testing.T) {
rcm := newReconnectCredentialManager(t.Name(), t.Name(), 4)
backoff := &retry.BackoffHandler{MaxRetries: 3}
auth := func(ctx context.Context, n int) (tunnelpogs.AuthOutcome, error) {
return tunnelpogs.NewAuthFail(errors.New("auth fail")), nil
}
retryChan, err := rcm.RefreshAuth(context.Background(), backoff, auth)
assert.Error(t, err)
assert.Nil(t, retryChan)
token, err := rcm.ReconnectToken()
assert.Equal(t, errJWTUnset, err)
assert.Nil(t, token)
}

391
supervisor/supervisor.go Normal file
View File

@@ -0,0 +1,391 @@
package supervisor
import (
"context"
"errors"
"fmt"
"time"
"github.com/google/uuid"
"github.com/rs/zerolog"
"github.com/cloudflare/cloudflared/connection"
"github.com/cloudflare/cloudflared/edgediscovery"
"github.com/cloudflare/cloudflared/edgediscovery/allregions"
"github.com/cloudflare/cloudflared/h2mux"
"github.com/cloudflare/cloudflared/retry"
"github.com/cloudflare/cloudflared/signal"
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
)
const (
// SRV and TXT record resolution TTL
ResolveTTL = time.Hour
// Waiting time before retrying a failed tunnel connection
tunnelRetryDuration = time.Second * 10
// Interval between registering new tunnels
registrationInterval = time.Second
subsystemRefreshAuth = "refresh_auth"
// Maximum exponent for 'Authenticate' exponential backoff
refreshAuthMaxBackoff = 10
// Waiting time before retrying a failed 'Authenticate' connection
refreshAuthRetryDuration = time.Second * 10
)
// Supervisor manages non-declarative tunnels. Establishes TCP connections with the edge, and
// reconnects them if they disconnect.
type Supervisor struct {
cloudflaredUUID uuid.UUID
configManager *configManager
config *TunnelConfig
edgeIPs *edgediscovery.Edge
tunnelErrors chan tunnelError
tunnelsConnecting map[int]chan struct{}
// nextConnectedIndex and nextConnectedSignal are used to wait for all
// currently-connecting tunnels to finish connecting so we can reset backoff timer
nextConnectedIndex int
nextConnectedSignal chan struct{}
log *ConnAwareLogger
logTransport *zerolog.Logger
reconnectCredentialManager *reconnectCredentialManager
useReconnectToken bool
reconnectCh chan ReconnectSignal
gracefulShutdownC <-chan struct{}
}
var errEarlyShutdown = errors.New("shutdown started")
type tunnelError struct {
index int
addr *allregions.EdgeAddr
err error
}
func NewSupervisor(config *TunnelConfig, dynamiConfig *DynamicConfig, reconnectCh chan ReconnectSignal, gracefulShutdownC <-chan struct{}) (*Supervisor, error) {
cloudflaredUUID, err := uuid.NewRandom()
if err != nil {
return nil, fmt.Errorf("failed to generate cloudflared instance ID: %w", err)
}
var edgeIPs *edgediscovery.Edge
if len(config.EdgeAddrs) > 0 {
edgeIPs, err = edgediscovery.StaticEdge(config.Log, config.EdgeAddrs)
} else {
edgeIPs, err = edgediscovery.ResolveEdge(config.Log, config.Region)
}
if err != nil {
return nil, err
}
useReconnectToken := false
if config.ClassicTunnel != nil {
useReconnectToken = config.ClassicTunnel.UseReconnectToken
}
return &Supervisor{
cloudflaredUUID: cloudflaredUUID,
config: config,
configManager: newConfigManager(dynamiConfig, config.Tags, config.Log),
edgeIPs: edgeIPs,
tunnelErrors: make(chan tunnelError),
tunnelsConnecting: map[int]chan struct{}{},
log: NewConnAwareLogger(config.Log, config.Observer),
logTransport: config.LogTransport,
reconnectCredentialManager: newReconnectCredentialManager(connection.MetricsNamespace, connection.TunnelSubsystem, config.HAConnections),
useReconnectToken: useReconnectToken,
reconnectCh: reconnectCh,
gracefulShutdownC: gracefulShutdownC,
}, nil
}
func (s *Supervisor) Run(
ctx context.Context,
connectedSignal *signal.Signal,
) error {
if err := s.initialize(ctx, connectedSignal); err != nil {
if err == errEarlyShutdown {
return nil
}
return err
}
var tunnelsWaiting []int
tunnelsActive := s.config.HAConnections
backoff := retry.BackoffHandler{MaxRetries: s.config.Retries, BaseTime: tunnelRetryDuration, RetryForever: true}
var backoffTimer <-chan time.Time
refreshAuthBackoff := &retry.BackoffHandler{MaxRetries: refreshAuthMaxBackoff, BaseTime: refreshAuthRetryDuration, RetryForever: true}
var refreshAuthBackoffTimer <-chan time.Time
if s.useReconnectToken {
if timer, err := s.reconnectCredentialManager.RefreshAuth(ctx, refreshAuthBackoff, s.authenticate); err == nil {
refreshAuthBackoffTimer = timer
} else {
s.log.Logger().Err(err).
Dur("refreshAuthRetryDuration", refreshAuthRetryDuration).
Msgf("supervisor: initial refreshAuth failed, retrying in %v", refreshAuthRetryDuration)
refreshAuthBackoffTimer = time.After(refreshAuthRetryDuration)
}
}
shuttingDown := false
for {
select {
// Context cancelled
case <-ctx.Done():
for tunnelsActive > 0 {
<-s.tunnelErrors
tunnelsActive--
}
return nil
// startTunnel returned with error
// (note that this may also be caused by context cancellation)
case tunnelError := <-s.tunnelErrors:
tunnelsActive--
if tunnelError.err != nil && !shuttingDown {
s.log.ConnAwareLogger().Err(tunnelError.err).Int(connection.LogFieldConnIndex, tunnelError.index).Msg("Connection terminated")
tunnelsWaiting = append(tunnelsWaiting, tunnelError.index)
s.waitForNextTunnel(tunnelError.index)
if backoffTimer == nil {
backoffTimer = backoff.BackoffTimer()
}
// Previously we'd mark the edge address as bad here, but now we'll just silently use another.
} else if tunnelsActive == 0 {
// all connected tunnels exited gracefully, no more work to do
return nil
}
// Backoff was set and its timer expired
case <-backoffTimer:
backoffTimer = nil
for _, index := range tunnelsWaiting {
go s.startTunnel(ctx, index, s.newConnectedTunnelSignal(index))
}
tunnelsActive += len(tunnelsWaiting)
tunnelsWaiting = nil
// Time to call Authenticate
case <-refreshAuthBackoffTimer:
newTimer, err := s.reconnectCredentialManager.RefreshAuth(ctx, refreshAuthBackoff, s.authenticate)
if err != nil {
s.log.Logger().Err(err).Msg("supervisor: Authentication failed")
// Permanent failure. Leave the `select` without setting the
// channel to be non-null, so we'll never hit this case of the `select` again.
continue
}
refreshAuthBackoffTimer = newTimer
// Tunnel successfully connected
case <-s.nextConnectedSignal:
if !s.waitForNextTunnel(s.nextConnectedIndex) && len(tunnelsWaiting) == 0 {
// No more tunnels outstanding, clear backoff timer
backoff.SetGracePeriod()
}
case <-s.gracefulShutdownC:
shuttingDown = true
}
}
}
// Returns nil if initialization succeeded, else the initialization error.
func (s *Supervisor) initialize(
ctx context.Context,
connectedSignal *signal.Signal,
) error {
availableAddrs := s.edgeIPs.AvailableAddrs()
if s.config.HAConnections > availableAddrs {
s.log.Logger().Info().Msgf("You requested %d HA connections but I can give you at most %d.", s.config.HAConnections, availableAddrs)
s.config.HAConnections = availableAddrs
}
go s.startFirstTunnel(ctx, connectedSignal)
select {
case <-ctx.Done():
<-s.tunnelErrors
return ctx.Err()
case tunnelError := <-s.tunnelErrors:
return tunnelError.err
case <-s.gracefulShutdownC:
return errEarlyShutdown
case <-connectedSignal.Wait():
}
// At least one successful connection, so start the rest
for i := 1; i < s.config.HAConnections; i++ {
ch := signal.New(make(chan struct{}))
go s.startTunnel(ctx, i, ch)
time.Sleep(registrationInterval)
}
return nil
}
// startTunnel starts the first tunnel connection. The resulting error will be sent on
// s.tunnelErrors. It will send a signal via connectedSignal if registration succeed
func (s *Supervisor) startFirstTunnel(
ctx context.Context,
connectedSignal *signal.Signal,
) {
var (
addr *allregions.EdgeAddr
err error
)
const firstConnIndex = 0
defer func() {
s.tunnelErrors <- tunnelError{index: firstConnIndex, addr: addr, err: err}
}()
addr, err = s.edgeIPs.GetAddr(firstConnIndex)
if err != nil {
return
}
err = ServeTunnelLoop(
ctx,
s.reconnectCredentialManager,
s.configManager,
s.config,
addr,
s.log,
firstConnIndex,
connectedSignal,
s.cloudflaredUUID,
s.reconnectCh,
s.gracefulShutdownC,
)
// If the first tunnel disconnects, keep restarting it.
edgeErrors := 0
for s.unusedIPs() {
if ctx.Err() != nil {
return
}
switch err.(type) {
case nil:
return
// try the next address if it was a dialError(network problem) or
// dupConnRegisterTunnelError
case edgediscovery.DialError, connection.DupConnRegisterTunnelError:
edgeErrors++
default:
return
}
if edgeErrors >= 2 {
addr, err = s.edgeIPs.GetDifferentAddr(firstConnIndex)
if err != nil {
return
}
}
err = ServeTunnelLoop(
ctx,
s.reconnectCredentialManager,
s.configManager,
s.config,
addr,
s.log,
firstConnIndex,
connectedSignal,
s.cloudflaredUUID,
s.reconnectCh,
s.gracefulShutdownC,
)
}
}
// startTunnel starts a new tunnel connection. The resulting error will be sent on
// s.tunnelErrors.
func (s *Supervisor) startTunnel(
ctx context.Context,
index int,
connectedSignal *signal.Signal,
) {
var (
addr *allregions.EdgeAddr
err error
)
defer func() {
s.tunnelErrors <- tunnelError{index: index, addr: addr, err: err}
}()
addr, err = s.edgeIPs.GetDifferentAddr(index)
if err != nil {
return
}
err = ServeTunnelLoop(
ctx,
s.reconnectCredentialManager,
s.configManager,
s.config,
addr,
s.log,
uint8(index),
connectedSignal,
s.cloudflaredUUID,
s.reconnectCh,
s.gracefulShutdownC,
)
}
func (s *Supervisor) newConnectedTunnelSignal(index int) *signal.Signal {
sig := make(chan struct{})
s.tunnelsConnecting[index] = sig
s.nextConnectedSignal = sig
s.nextConnectedIndex = index
return signal.New(sig)
}
func (s *Supervisor) waitForNextTunnel(index int) bool {
delete(s.tunnelsConnecting, index)
s.nextConnectedSignal = nil
for k, v := range s.tunnelsConnecting {
s.nextConnectedIndex = k
s.nextConnectedSignal = v
return true
}
return false
}
func (s *Supervisor) unusedIPs() bool {
return s.edgeIPs.AvailableAddrs() > s.config.HAConnections
}
func (s *Supervisor) authenticate(ctx context.Context, numPreviousAttempts int) (tunnelpogs.AuthOutcome, error) {
arbitraryEdgeIP, err := s.edgeIPs.GetAddrForRPC()
if err != nil {
return nil, err
}
edgeConn, err := edgediscovery.DialEdge(ctx, dialTimeout, s.config.EdgeTLSConfigs[connection.H2mux], arbitraryEdgeIP.TCP)
if err != nil {
return nil, err
}
defer edgeConn.Close()
handler := h2mux.MuxedStreamFunc(func(*h2mux.MuxedStream) error {
// This callback is invoked by h2mux when the edge initiates a stream.
return nil // noop
})
muxerConfig := s.config.MuxerConfig.H2MuxerConfig(handler, s.logTransport)
muxer, err := h2mux.Handshake(edgeConn, edgeConn, *muxerConfig, h2mux.ActiveStreams)
if err != nil {
return nil, err
}
go muxer.Serve(ctx)
defer func() {
// If we don't wait for the muxer shutdown here, edgeConn.Close() runs before the muxer connections are done,
// and the user sees log noise: "error writing data", "connection closed unexpectedly"
<-muxer.Shutdown()
}()
stream, err := muxer.OpenRPCStream(ctx)
if err != nil {
return nil, err
}
rpcClient := connection.NewTunnelServerClient(ctx, stream, s.log.Logger())
defer rpcClient.Close()
const arbitraryConnectionID = uint8(0)
registrationOptions := s.config.registrationOptions(arbitraryConnectionID, edgeConn.LocalAddr().String(), s.cloudflaredUUID)
registrationOptions.NumPreviousAttempts = uint8(numPreviousAttempts)
return rpcClient.Authenticate(ctx, s.config.ClassicTunnel, registrationOptions)
}

617
supervisor/tunnel.go Normal file
View File

@@ -0,0 +1,617 @@
package supervisor
import (
"context"
"crypto/tls"
"fmt"
"net"
"runtime/debug"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/lucas-clemente/quic-go"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
"github.com/cloudflare/cloudflared/connection"
"github.com/cloudflare/cloudflared/edgediscovery"
"github.com/cloudflare/cloudflared/edgediscovery/allregions"
"github.com/cloudflare/cloudflared/h2mux"
quicpogs "github.com/cloudflare/cloudflared/quic"
"github.com/cloudflare/cloudflared/retry"
"github.com/cloudflare/cloudflared/signal"
"github.com/cloudflare/cloudflared/tunnelrpc"
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
)
const (
dialTimeout = 15 * time.Second
FeatureSerializedHeaders = "serialized_headers"
FeatureQuickReconnects = "quick_reconnects"
)
type TunnelConfig struct {
GracePeriod time.Duration
ReplaceExisting bool
OSArch string
ClientID string
CloseConnOnce *sync.Once // Used to close connectedSignal no more than once
EdgeAddrs []string
Region string
HAConnections int
IncidentLookup IncidentLookup
IsAutoupdated bool
LBPool string
Tags []tunnelpogs.Tag
Log *zerolog.Logger
LogTransport *zerolog.Logger
Observer *connection.Observer
ReportedVersion string
Retries uint
RunFromTerminal bool
NamedTunnel *connection.NamedTunnelProperties
ClassicTunnel *connection.ClassicTunnelProperties
MuxerConfig *connection.MuxerConfig
ProtocolSelector connection.ProtocolSelector
EdgeTLSConfigs map[connection.Protocol]*tls.Config
}
func (c *TunnelConfig) registrationOptions(connectionID uint8, OriginLocalIP string, uuid uuid.UUID) *tunnelpogs.RegistrationOptions {
policy := tunnelrpc.ExistingTunnelPolicy_balance
if c.HAConnections <= 1 && c.LBPool == "" {
policy = tunnelrpc.ExistingTunnelPolicy_disconnect
}
return &tunnelpogs.RegistrationOptions{
ClientID: c.ClientID,
Version: c.ReportedVersion,
OS: c.OSArch,
ExistingTunnelPolicy: policy,
PoolName: c.LBPool,
Tags: c.Tags,
ConnectionID: connectionID,
OriginLocalIP: OriginLocalIP,
IsAutoupdated: c.IsAutoupdated,
RunFromTerminal: c.RunFromTerminal,
CompressionQuality: uint64(c.MuxerConfig.CompressionSetting),
UUID: uuid.String(),
Features: c.SupportedFeatures(),
}
}
func (c *TunnelConfig) connectionOptions(originLocalAddr string, numPreviousAttempts uint8) *tunnelpogs.ConnectionOptions {
// attempt to parse out origin IP, but don't fail since it's informational field
host, _, _ := net.SplitHostPort(originLocalAddr)
originIP := net.ParseIP(host)
return &tunnelpogs.ConnectionOptions{
Client: c.NamedTunnel.Client,
OriginLocalIP: originIP,
ReplaceExisting: c.ReplaceExisting,
CompressionQuality: uint8(c.MuxerConfig.CompressionSetting),
NumPreviousAttempts: numPreviousAttempts,
}
}
func (c *TunnelConfig) SupportedFeatures() []string {
features := []string{FeatureSerializedHeaders}
if c.NamedTunnel == nil {
features = append(features, FeatureQuickReconnects)
}
return features
}
func StartTunnelDaemon(
ctx context.Context,
config *TunnelConfig,
dynamiConfig *DynamicConfig,
connectedSignal *signal.Signal,
reconnectCh chan ReconnectSignal,
graceShutdownC <-chan struct{},
) error {
s, err := NewSupervisor(config, dynamiConfig, reconnectCh, graceShutdownC)
if err != nil {
return err
}
return s.Run(ctx, connectedSignal)
}
func ServeTunnelLoop(
ctx context.Context,
credentialManager *reconnectCredentialManager,
configManager *configManager,
config *TunnelConfig,
addr *allregions.EdgeAddr,
connAwareLogger *ConnAwareLogger,
connIndex uint8,
connectedSignal *signal.Signal,
cloudflaredUUID uuid.UUID,
reconnectCh chan ReconnectSignal,
gracefulShutdownC <-chan struct{},
) error {
haConnections.Inc()
defer haConnections.Dec()
logger := config.Log.With().Uint8(connection.LogFieldConnIndex, connIndex).Logger()
connLog := connAwareLogger.ReplaceLogger(&logger)
protocolFallback := &protocolFallback{
retry.BackoffHandler{MaxRetries: config.Retries},
config.ProtocolSelector.Current(),
false,
}
connectedFuse := h2mux.NewBooleanFuse()
go func() {
if connectedFuse.Await() {
connectedSignal.Notify()
}
}()
// Ensure the above goroutine will terminate if we return without connecting
defer connectedFuse.Fuse(false)
// Each connection to keep its own copy of protocol, because individual connections might fallback
// to another protocol when a particular metal doesn't support new protocol
for {
err, recoverable := ServeTunnel(
ctx,
connLog,
credentialManager,
configManager,
config,
addr,
connIndex,
connectedFuse,
protocolFallback,
cloudflaredUUID,
reconnectCh,
protocolFallback.protocol,
gracefulShutdownC,
)
if !recoverable {
return err
}
config.Observer.SendReconnect(connIndex)
duration, ok := protocolFallback.GetMaxBackoffDuration(ctx)
if !ok {
return err
}
connLog.Logger().Info().Msgf("Retrying connection in up to %s seconds", duration)
select {
case <-ctx.Done():
return ctx.Err()
case <-gracefulShutdownC:
return nil
case <-protocolFallback.BackoffTimer():
if !selectNextProtocol(
connLog.Logger(),
protocolFallback,
config.ProtocolSelector,
err,
) {
return err
}
}
}
}
// protocolFallback is a wrapper around backoffHandler that will try fallback option when backoff reaches
// max retries
type protocolFallback struct {
retry.BackoffHandler
protocol connection.Protocol
inFallback bool
}
func (pf *protocolFallback) reset() {
pf.ResetNow()
pf.inFallback = false
}
func (pf *protocolFallback) fallback(fallback connection.Protocol) {
pf.ResetNow()
pf.protocol = fallback
pf.inFallback = true
}
// selectNextProtocol picks connection protocol for the next retry iteration,
// returns true if it was able to pick the protocol, false if we are out of options and should stop retrying
func selectNextProtocol(
connLog *zerolog.Logger,
protocolBackoff *protocolFallback,
selector connection.ProtocolSelector,
cause error,
) bool {
var idleTimeoutError *quic.IdleTimeoutError
isNetworkActivityTimeout := errors.As(cause, &idleTimeoutError)
_, hasFallback := selector.Fallback()
if protocolBackoff.ReachedMaxRetries() || (hasFallback && isNetworkActivityTimeout) {
fallback, hasFallback := selector.Fallback()
if !hasFallback {
return false
}
// Already using fallback protocol, no point to retry
if protocolBackoff.protocol == fallback {
return false
}
connLog.Info().Msgf("Switching to fallback protocol %s", fallback)
protocolBackoff.fallback(fallback)
} else if !protocolBackoff.inFallback {
current := selector.Current()
if protocolBackoff.protocol != current {
protocolBackoff.protocol = current
connLog.Info().Msgf("Changing protocol to %s", current)
}
}
return true
}
// ServeTunnel runs a single tunnel connection, returns nil on graceful shutdown,
// on error returns a flag indicating if error can be retried
func ServeTunnel(
ctx context.Context,
connLog *ConnAwareLogger,
credentialManager *reconnectCredentialManager,
configManager *configManager,
config *TunnelConfig,
addr *allregions.EdgeAddr,
connIndex uint8,
fuse *h2mux.BooleanFuse,
backoff *protocolFallback,
cloudflaredUUID uuid.UUID,
reconnectCh chan ReconnectSignal,
protocol connection.Protocol,
gracefulShutdownC <-chan struct{},
) (err error, recoverable bool) {
// Treat panics as recoverable errors
defer func() {
if r := recover(); r != nil {
var ok bool
err, ok = r.(error)
if !ok {
err = fmt.Errorf("ServeTunnel: %v", r)
}
err = errors.Wrapf(err, "stack trace: %s", string(debug.Stack()))
recoverable = true
}
}()
defer config.Observer.SendDisconnect(connIndex)
err, recoverable = serveTunnel(
ctx,
connLog,
credentialManager,
configManager,
config,
addr,
connIndex,
fuse,
backoff,
cloudflaredUUID,
reconnectCh,
protocol,
gracefulShutdownC,
)
if err != nil {
switch err := err.(type) {
case connection.DupConnRegisterTunnelError:
connLog.ConnAwareLogger().Err(err).Msg("Unable to establish connection.")
// don't retry this connection anymore, let supervisor pick a new address
return err, false
case connection.ServerRegisterTunnelError:
connLog.ConnAwareLogger().Err(err).Msg("Register tunnel error from server side")
// Don't send registration error return from server to Sentry. They are
// logged on server side
if incidents := config.IncidentLookup.ActiveIncidents(); len(incidents) > 0 {
connLog.ConnAwareLogger().Msg(activeIncidentsMsg(incidents))
}
return err.Cause, !err.Permanent
case ReconnectSignal:
connLog.Logger().Info().
Uint8(connection.LogFieldConnIndex, connIndex).
Msgf("Restarting connection due to reconnect signal in %s", err.Delay)
err.DelayBeforeReconnect()
return err, true
default:
if err == context.Canceled {
connLog.Logger().Debug().Err(err).Msgf("Serve tunnel error")
return err, false
}
connLog.ConnAwareLogger().Err(err).Msgf("Serve tunnel error")
_, permanent := err.(unrecoverableError)
return err, !permanent
}
}
return nil, false
}
func serveTunnel(
ctx context.Context,
connLog *ConnAwareLogger,
credentialManager *reconnectCredentialManager,
configManager *configManager,
config *TunnelConfig,
addr *allregions.EdgeAddr,
connIndex uint8,
fuse *h2mux.BooleanFuse,
backoff *protocolFallback,
cloudflaredUUID uuid.UUID,
reconnectCh chan ReconnectSignal,
protocol connection.Protocol,
gracefulShutdownC <-chan struct{},
) (err error, recoverable bool) {
connectedFuse := &connectedFuse{
fuse: fuse,
backoff: backoff,
}
controlStream := connection.NewControlStream(
config.Observer,
connectedFuse,
config.NamedTunnel,
connIndex,
nil,
gracefulShutdownC,
config.GracePeriod,
)
switch protocol {
case connection.QUIC, connection.QUICWarp:
connOptions := config.connectionOptions(addr.UDP.String(), uint8(backoff.Retries()))
return ServeQUIC(ctx,
addr.UDP,
configManager,
config,
connLog,
connOptions,
controlStream,
connIndex,
reconnectCh,
gracefulShutdownC)
case connection.HTTP2, connection.HTTP2Warp:
edgeConn, err := edgediscovery.DialEdge(ctx, dialTimeout, config.EdgeTLSConfigs[protocol], addr.TCP)
if err != nil {
connLog.ConnAwareLogger().Err(err).Msg("Unable to establish connection with Cloudflare edge")
return err, true
}
connOptions := config.connectionOptions(edgeConn.LocalAddr().String(), uint8(backoff.Retries()))
if err := ServeHTTP2(
ctx,
connLog,
configManager,
config,
edgeConn,
connOptions,
controlStream,
connIndex,
gracefulShutdownC,
reconnectCh,
); err != nil {
return err, false
}
default:
edgeConn, err := edgediscovery.DialEdge(ctx, dialTimeout, config.EdgeTLSConfigs[protocol], addr.TCP)
if err != nil {
connLog.ConnAwareLogger().Err(err).Msg("Unable to establish connection with Cloudflare edge")
return err, true
}
if err := ServeH2mux(
ctx,
connLog,
credentialManager,
configManager,
config,
edgeConn,
connIndex,
connectedFuse,
cloudflaredUUID,
reconnectCh,
gracefulShutdownC,
); err != nil {
return err, false
}
}
return
}
type unrecoverableError struct {
err error
}
func (r unrecoverableError) Error() string {
return r.err.Error()
}
func ServeH2mux(
ctx context.Context,
connLog *ConnAwareLogger,
credentialManager *reconnectCredentialManager,
configManager *configManager,
config *TunnelConfig,
edgeConn net.Conn,
connIndex uint8,
connectedFuse *connectedFuse,
cloudflaredUUID uuid.UUID,
reconnectCh chan ReconnectSignal,
gracefulShutdownC <-chan struct{},
) error {
connLog.Logger().Debug().Msgf("Connecting via h2mux")
// Returns error from parsing the origin URL or handshake errors
handler, err, recoverable := connection.NewH2muxConnection(
configManager,
config.GracePeriod,
config.MuxerConfig,
edgeConn,
connIndex,
config.Observer,
gracefulShutdownC,
)
if err != nil {
if !recoverable {
return unrecoverableError{err}
}
return err
}
errGroup, serveCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
if config.NamedTunnel != nil {
connOptions := config.connectionOptions(edgeConn.LocalAddr().String(), uint8(connectedFuse.backoff.Retries()))
return handler.ServeNamedTunnel(serveCtx, config.NamedTunnel, connOptions, connectedFuse)
}
registrationOptions := config.registrationOptions(connIndex, edgeConn.LocalAddr().String(), cloudflaredUUID)
return handler.ServeClassicTunnel(serveCtx, config.ClassicTunnel, credentialManager, registrationOptions, connectedFuse)
})
errGroup.Go(func() error {
return listenReconnect(serveCtx, reconnectCh, gracefulShutdownC)
})
return errGroup.Wait()
}
func ServeHTTP2(
ctx context.Context,
connLog *ConnAwareLogger,
configManager *configManager,
config *TunnelConfig,
tlsServerConn net.Conn,
connOptions *tunnelpogs.ConnectionOptions,
controlStreamHandler connection.ControlStreamHandler,
connIndex uint8,
gracefulShutdownC <-chan struct{},
reconnectCh chan ReconnectSignal,
) error {
connLog.Logger().Debug().Msgf("Connecting via http2")
h2conn := connection.NewHTTP2Connection(
tlsServerConn,
configManager,
connOptions,
config.Observer,
connIndex,
controlStreamHandler,
config.Log,
)
errGroup, serveCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return h2conn.Serve(serveCtx)
})
errGroup.Go(func() error {
err := listenReconnect(serveCtx, reconnectCh, gracefulShutdownC)
if err != nil {
// forcefully break the connection (this is only used for testing)
_ = tlsServerConn.Close()
}
return err
})
return errGroup.Wait()
}
func ServeQUIC(
ctx context.Context,
edgeAddr *net.UDPAddr,
configManager *configManager,
config *TunnelConfig,
connLogger *ConnAwareLogger,
connOptions *tunnelpogs.ConnectionOptions,
controlStreamHandler connection.ControlStreamHandler,
connIndex uint8,
reconnectCh chan ReconnectSignal,
gracefulShutdownC <-chan struct{},
) (err error, recoverable bool) {
tlsConfig := config.EdgeTLSConfigs[connection.QUIC]
quicConfig := &quic.Config{
HandshakeIdleTimeout: quicpogs.HandshakeIdleTimeout,
MaxIdleTimeout: quicpogs.MaxIdleTimeout,
MaxIncomingStreams: connection.MaxConcurrentStreams,
MaxIncomingUniStreams: connection.MaxConcurrentStreams,
KeepAlive: true,
EnableDatagrams: true,
MaxDatagramFrameSize: quicpogs.MaxDatagramFrameSize,
Tracer: quicpogs.NewClientTracer(connLogger.Logger(), connIndex),
}
quicConn, err := connection.NewQUICConnection(
quicConfig,
edgeAddr,
tlsConfig,
configManager,
connOptions,
controlStreamHandler,
connLogger.Logger())
if err != nil {
connLogger.ConnAwareLogger().Err(err).Msgf("Failed to create new quic connection")
return err, true
}
errGroup, serveCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
err := quicConn.Serve(serveCtx)
if err != nil {
connLogger.ConnAwareLogger().Err(err).Msg("Failed to serve quic connection")
}
return err
})
errGroup.Go(func() error {
err := listenReconnect(serveCtx, reconnectCh, gracefulShutdownC)
if err != nil {
// forcefully break the connection (this is only used for testing)
quicConn.Close()
}
return err
})
return errGroup.Wait(), false
}
func listenReconnect(ctx context.Context, reconnectCh <-chan ReconnectSignal, gracefulShutdownCh <-chan struct{}) error {
select {
case reconnect := <-reconnectCh:
return reconnect
case <-gracefulShutdownCh:
return nil
case <-ctx.Done():
return nil
}
}
type connectedFuse struct {
fuse *h2mux.BooleanFuse
backoff *protocolFallback
}
func (cf *connectedFuse) Connected() {
cf.fuse.Fuse(true)
cf.backoff.reset()
}
func (cf *connectedFuse) IsConnected() bool {
return cf.fuse.Value()
}
func activeIncidentsMsg(incidents []Incident) string {
preamble := "There is an active Cloudflare incident that may be related:"
if len(incidents) > 1 {
preamble = "There are active Cloudflare incidents that may be related:"
}
incidentStrings := []string{}
for _, incident := range incidents {
incidentString := fmt.Sprintf("%s (%s)", incident.Name, incident.URL())
incidentStrings = append(incidentStrings, incidentString)
}
return preamble + " " + strings.Join(incidentStrings, "; ")
}

120
supervisor/tunnel_test.go Normal file
View File

@@ -0,0 +1,120 @@
package supervisor
import (
"testing"
"time"
"github.com/lucas-clemente/quic-go"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/cloudflare/cloudflared/connection"
"github.com/cloudflare/cloudflared/edgediscovery"
"github.com/cloudflare/cloudflared/retry"
)
type dynamicMockFetcher struct {
protocolPercents edgediscovery.ProtocolPercents
err error
}
func (dmf *dynamicMockFetcher) fetch() connection.PercentageFetcher {
return func() (edgediscovery.ProtocolPercents, error) {
return dmf.protocolPercents, dmf.err
}
}
func TestWaitForBackoffFallback(t *testing.T) {
maxRetries := uint(3)
backoff := retry.BackoffHandler{
MaxRetries: maxRetries,
BaseTime: time.Millisecond * 10,
}
log := zerolog.Nop()
resolveTTL := time.Duration(0)
namedTunnel := &connection.NamedTunnelProperties{}
mockFetcher := dynamicMockFetcher{
protocolPercents: edgediscovery.ProtocolPercents{edgediscovery.ProtocolPercent{Protocol: "http2", Percentage: 100}},
}
warpRoutingEnabled := false
protocolSelector, err := connection.NewProtocolSelector(
"auto",
warpRoutingEnabled,
namedTunnel,
mockFetcher.fetch(),
resolveTTL,
&log,
)
assert.NoError(t, err)
initProtocol := protocolSelector.Current()
assert.Equal(t, connection.HTTP2, initProtocol)
protoFallback := &protocolFallback{
backoff,
initProtocol,
false,
}
// Retry #0 and #1. At retry #2, we switch protocol, so the fallback loop has one more retry than this
for i := 0; i < int(maxRetries-1); i++ {
protoFallback.BackoffTimer() // simulate retry
ok := selectNextProtocol(&log, protoFallback, protocolSelector, nil)
assert.True(t, ok)
assert.Equal(t, initProtocol, protoFallback.protocol)
}
// Retry fallback protocol
for i := 0; i < int(maxRetries); i++ {
protoFallback.BackoffTimer() // simulate retry
ok := selectNextProtocol(&log, protoFallback, protocolSelector, nil)
assert.True(t, ok)
fallback, ok := protocolSelector.Fallback()
assert.True(t, ok)
assert.Equal(t, fallback, protoFallback.protocol)
}
currentGlobalProtocol := protocolSelector.Current()
assert.Equal(t, initProtocol, currentGlobalProtocol)
// No protocol to fallback, return error
protoFallback.BackoffTimer() // simulate retry
ok := selectNextProtocol(&log, protoFallback, protocolSelector, nil)
assert.False(t, ok)
protoFallback.reset()
protoFallback.BackoffTimer() // simulate retry
ok = selectNextProtocol(&log, protoFallback, protocolSelector, nil)
assert.True(t, ok)
assert.Equal(t, initProtocol, protoFallback.protocol)
protoFallback.reset()
protoFallback.BackoffTimer() // simulate retry
ok = selectNextProtocol(&log, protoFallback, protocolSelector, &quic.IdleTimeoutError{})
// Check that we get a true after the first try itself when this flag is true. This allows us to immediately
// switch protocols when there is a fallback.
assert.True(t, ok)
// But if there is no fallback available, then we exhaust the retries despite the type of error.
// The reason why there's no fallback available is because we pick a specific protocol instead of letting it be auto.
protocolSelector, err = connection.NewProtocolSelector(
"quic",
warpRoutingEnabled,
namedTunnel,
mockFetcher.fetch(),
resolveTTL,
&log,
)
assert.NoError(t, err)
protoFallback = &protocolFallback{backoff, protocolSelector.Current(), false}
for i := 0; i < int(maxRetries-1); i++ {
protoFallback.BackoffTimer() // simulate retry
ok := selectNextProtocol(&log, protoFallback, protocolSelector, &quic.IdleTimeoutError{})
assert.True(t, ok)
assert.Equal(t, connection.QUIC, protoFallback.protocol)
}
// And finally it fails as it should, with no fallback.
protoFallback.BackoffTimer()
ok = selectNextProtocol(&log, protoFallback, protocolSelector, &quic.IdleTimeoutError{})
assert.False(t, ok)
}

View File

@@ -0,0 +1,50 @@
package supervisor
import (
"fmt"
"sync"
"github.com/prometheus/client_golang/prometheus"
)
// tunnelsForHA maps this cloudflared instance's HA connections to the tunnel IDs they serve.
type tunnelsForHA struct {
sync.Mutex
metrics *prometheus.GaugeVec
entries map[uint8]string
}
// NewTunnelsForHA initializes the Prometheus metrics etc for a tunnelsForHA.
func NewTunnelsForHA() tunnelsForHA {
metrics := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "tunnel_ids",
Help: "The ID of all tunnels (and their corresponding HA connection ID) running in this instance of cloudflared.",
},
[]string{"tunnel_id", "ha_conn_id"},
)
prometheus.MustRegister(metrics)
return tunnelsForHA{
metrics: metrics,
entries: make(map[uint8]string),
}
}
// Track a new tunnel ID, removing the disconnected tunnel (if any) and update metrics.
func (t *tunnelsForHA) AddTunnelID(haConn uint8, tunnelID string) {
t.Lock()
defer t.Unlock()
haStr := fmt.Sprintf("%v", haConn)
if oldTunnelID, ok := t.entries[haConn]; ok {
t.metrics.WithLabelValues(oldTunnelID, haStr).Dec()
}
t.entries[haConn] = tunnelID
t.metrics.WithLabelValues(tunnelID, haStr).Inc()
}
func (t *tunnelsForHA) String() string {
t.Lock()
defer t.Unlock()
return fmt.Sprintf("%v", t.entries)
}