mirror of
https://github.com/cloudflare/cloudflared.git
synced 2025-07-27 19:29:57 +00:00
TUN-9470: Add OriginDialerService to include TCP
Adds an OriginDialerService that takes over the roles of both DialUDP and DialTCP towards the origin. This provides the possibility to leverage dialer "middleware" to inject virtual origins, such as the DNS resolver service. DNS Resolver service also gains access to the DialTCP operation to service TCP DNS requests. Minor refactoring includes changes to remove the needs previously provided by the warp-routing configuration. This configuration cannot be disabled by cloudflared so many of the references have been adjusted or removed. Closes TUN-9470
This commit is contained in:
@@ -2,7 +2,6 @@ package orchestration
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/cloudflare/cloudflared/config"
|
||||
"github.com/cloudflare/cloudflared/ingress"
|
||||
@@ -20,9 +19,9 @@ type newLocalConfig struct {
|
||||
|
||||
// Config is the original config as read and parsed by cloudflared.
|
||||
type Config struct {
|
||||
Ingress *ingress.Ingress
|
||||
WarpRouting ingress.WarpRoutingConfig
|
||||
WriteTimeout time.Duration
|
||||
Ingress *ingress.Ingress
|
||||
WarpRouting ingress.WarpRoutingConfig
|
||||
OriginDialerService *ingress.OriginDialerService
|
||||
|
||||
// Extra settings used to configure this instance but that are not eligible for remotely management
|
||||
// ie. (--protocol, --loglevel, ...)
|
||||
|
@@ -38,7 +38,9 @@ type Orchestrator struct {
|
||||
tags []pogs.Tag
|
||||
// flowLimiter tracks active sessions across the tunnel and limits new sessions if they are above the limit.
|
||||
flowLimiter cfdflow.Limiter
|
||||
log *zerolog.Logger
|
||||
// Origin dialer service to manage egress socket dialing.
|
||||
originDialerService *ingress.OriginDialerService
|
||||
log *zerolog.Logger
|
||||
|
||||
// orchestrator must not handle any more updates after shutdownC is closed
|
||||
shutdownC <-chan struct{}
|
||||
@@ -50,18 +52,20 @@ func NewOrchestrator(ctx context.Context,
|
||||
config *Config,
|
||||
tags []pogs.Tag,
|
||||
internalRules []ingress.Rule,
|
||||
log *zerolog.Logger) (*Orchestrator, error) {
|
||||
log *zerolog.Logger,
|
||||
) (*Orchestrator, error) {
|
||||
o := &Orchestrator{
|
||||
// Lowest possible version, any remote configuration will have version higher than this
|
||||
// Starting at -1 allows a configuration migration (local to remote) to override the current configuration as it
|
||||
// will start at version 0.
|
||||
currentVersion: -1,
|
||||
internalRules: internalRules,
|
||||
config: config,
|
||||
tags: tags,
|
||||
flowLimiter: cfdflow.NewLimiter(config.WarpRouting.MaxActiveFlows),
|
||||
log: log,
|
||||
shutdownC: ctx.Done(),
|
||||
currentVersion: -1,
|
||||
internalRules: internalRules,
|
||||
config: config,
|
||||
tags: tags,
|
||||
flowLimiter: cfdflow.NewLimiter(config.WarpRouting.MaxActiveFlows),
|
||||
originDialerService: config.OriginDialerService,
|
||||
log: log,
|
||||
shutdownC: ctx.Done(),
|
||||
}
|
||||
if err := o.updateIngress(*config.Ingress, config.WarpRouting); err != nil {
|
||||
return nil, err
|
||||
@@ -175,7 +179,15 @@ func (o *Orchestrator) updateIngress(ingressRules ingress.Ingress, warpRouting i
|
||||
// Update the flow limit since the configuration might have changed
|
||||
o.flowLimiter.SetLimit(warpRouting.MaxActiveFlows)
|
||||
|
||||
proxy := proxy.NewOriginProxy(ingressRules, warpRouting, o.tags, o.flowLimiter, o.config.WriteTimeout, o.log)
|
||||
// Update the origin dialer service with the new dialer settings
|
||||
// We need to update the dialer here instead of creating a new instance of OriginDialerService because it has
|
||||
// its own references and go routines. Specifically, the UDP dialer is a reference to this same service all the
|
||||
// way into the datagram manager. Reconstructing the datagram manager is not something we currently provide during
|
||||
// runtime in response to a configuration push except when starting a tunnel connection.
|
||||
o.originDialerService.UpdateDefaultDialer(ingress.NewDialer(warpRouting))
|
||||
|
||||
// Create and replace the origin proxy with a new instance
|
||||
proxy := proxy.NewOriginProxy(ingressRules, o.originDialerService, o.tags, o.flowLimiter, o.log)
|
||||
o.proxy.Store(proxy)
|
||||
o.config.Ingress = &ingressRules
|
||||
o.config.WarpRouting = warpRouting
|
||||
|
@@ -41,6 +41,11 @@ var (
|
||||
Value: "test",
|
||||
},
|
||||
}
|
||||
testDefaultDialer = ingress.NewDialer(ingress.WarpRoutingConfig{
|
||||
ConnectTimeout: config.CustomDuration{Duration: 1 * time.Second},
|
||||
TCPKeepAlive: config.CustomDuration{Duration: 15 * time.Second},
|
||||
MaxActiveFlows: 0,
|
||||
})
|
||||
)
|
||||
|
||||
// TestUpdateConfiguration tests that
|
||||
@@ -50,8 +55,13 @@ var (
|
||||
// - configurations can be deserialized
|
||||
// - receiving an old version is noop
|
||||
func TestUpdateConfiguration(t *testing.T) {
|
||||
originDialer := ingress.NewOriginDialer(ingress.OriginConfig{
|
||||
DefaultDialer: testDefaultDialer,
|
||||
TCPWriteTimeout: 1 * time.Second,
|
||||
}, &testLogger)
|
||||
initConfig := &Config{
|
||||
Ingress: &ingress.Ingress{},
|
||||
Ingress: &ingress.Ingress{},
|
||||
OriginDialerService: originDialer,
|
||||
}
|
||||
orchestrator, err := NewOrchestrator(t.Context(), initConfig, testTags, []ingress.Rule{ingress.NewManagementRule(management.New("management.argotunnel.com", false, "1.1.1.1:80", uuid.Nil, "", &testLogger, nil))}, &testLogger)
|
||||
require.NoError(t, err)
|
||||
@@ -179,8 +189,13 @@ func TestUpdateConfiguration(t *testing.T) {
|
||||
// Validates that a new version 0 will be applied if the configuration is loaded locally.
|
||||
// This will happen when a locally managed tunnel is migrated to remote configuration and receives its first configuration.
|
||||
func TestUpdateConfiguration_FromMigration(t *testing.T) {
|
||||
originDialer := ingress.NewOriginDialer(ingress.OriginConfig{
|
||||
DefaultDialer: testDefaultDialer,
|
||||
TCPWriteTimeout: 1 * time.Second,
|
||||
}, &testLogger)
|
||||
initConfig := &Config{
|
||||
Ingress: &ingress.Ingress{},
|
||||
Ingress: &ingress.Ingress{},
|
||||
OriginDialerService: originDialer,
|
||||
}
|
||||
orchestrator, err := NewOrchestrator(t.Context(), initConfig, testTags, []ingress.Rule{}, &testLogger)
|
||||
require.NoError(t, err)
|
||||
@@ -205,8 +220,13 @@ func TestUpdateConfiguration_FromMigration(t *testing.T) {
|
||||
|
||||
// Validates that the default ingress rule will be set if there is no rule provided from the remote.
|
||||
func TestUpdateConfiguration_WithoutIngressRule(t *testing.T) {
|
||||
originDialer := ingress.NewOriginDialer(ingress.OriginConfig{
|
||||
DefaultDialer: testDefaultDialer,
|
||||
TCPWriteTimeout: 1 * time.Second,
|
||||
}, &testLogger)
|
||||
initConfig := &Config{
|
||||
Ingress: &ingress.Ingress{},
|
||||
Ingress: &ingress.Ingress{},
|
||||
OriginDialerService: originDialer,
|
||||
}
|
||||
orchestrator, err := NewOrchestrator(t.Context(), initConfig, testTags, []ingress.Rule{}, &testLogger)
|
||||
require.NoError(t, err)
|
||||
@@ -244,6 +264,11 @@ func TestConcurrentUpdateAndRead(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
defer tcpOrigin.Close()
|
||||
|
||||
originDialer := ingress.NewOriginDialer(ingress.OriginConfig{
|
||||
DefaultDialer: testDefaultDialer,
|
||||
TCPWriteTimeout: 1 * time.Second,
|
||||
}, &testLogger)
|
||||
|
||||
var (
|
||||
configJSONV1 = []byte(fmt.Sprintf(`
|
||||
{
|
||||
@@ -296,7 +321,8 @@ func TestConcurrentUpdateAndRead(t *testing.T) {
|
||||
appliedV2 = make(chan struct{})
|
||||
|
||||
initConfig = &Config{
|
||||
Ingress: &ingress.Ingress{},
|
||||
Ingress: &ingress.Ingress{},
|
||||
OriginDialerService: originDialer,
|
||||
}
|
||||
)
|
||||
|
||||
@@ -313,7 +339,7 @@ func TestConcurrentUpdateAndRead(t *testing.T) {
|
||||
go func() {
|
||||
serveTCPOrigin(t, tcpOrigin, &wg)
|
||||
}()
|
||||
for i := 0; i < concurrentRequests; i++ {
|
||||
for i := range concurrentRequests {
|
||||
originProxy, err := orchestrator.GetOriginProxy()
|
||||
require.NoError(t, err)
|
||||
wg.Add(1)
|
||||
@@ -323,48 +349,37 @@ func TestConcurrentUpdateAndRead(t *testing.T) {
|
||||
assert.NoError(t, err, "proxyHTTP %d failed %v", i, err)
|
||||
defer resp.Body.Close()
|
||||
|
||||
var warpRoutingDisabled bool
|
||||
// The response can be from initOrigin, http_status:204 or http_status:418
|
||||
switch resp.StatusCode {
|
||||
// v1 proxy, warp enabled
|
||||
// v1 proxy
|
||||
case 200:
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, t.Name(), string(body))
|
||||
warpRoutingDisabled = false
|
||||
// v2 proxy, warp disabled
|
||||
// v2 proxy
|
||||
case 204:
|
||||
assert.Greater(t, i, concurrentRequests/4)
|
||||
warpRoutingDisabled = true
|
||||
// v3 proxy, warp enabled
|
||||
// v3 proxy
|
||||
case 418:
|
||||
assert.Greater(t, i, concurrentRequests/2)
|
||||
warpRoutingDisabled = false
|
||||
}
|
||||
|
||||
// Once we have originProxy, it won't be changed by configuration updates.
|
||||
// We can infer the version by the ProxyHTTP response code
|
||||
pr, pw := io.Pipe()
|
||||
|
||||
w := newRespReadWriteFlusher()
|
||||
|
||||
// Write TCP message and make sure it's echo back. This has to be done in a go routune since ProxyTCP doesn't
|
||||
// return until the stream is closed.
|
||||
if !warpRoutingDisabled {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer pw.Close()
|
||||
tcpEyeball(t, pw, tcpBody, w)
|
||||
}()
|
||||
}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer pw.Close()
|
||||
tcpEyeball(t, pw, tcpBody, w)
|
||||
}()
|
||||
|
||||
err = proxyTCP(ctx, originProxy, tcpOrigin.Addr().String(), w, pr)
|
||||
if warpRoutingDisabled {
|
||||
assert.Error(t, err, "expect proxyTCP %d to return error", i)
|
||||
} else {
|
||||
assert.NoError(t, err, "proxyTCP %d failed %v", i, err)
|
||||
}
|
||||
assert.NoError(t, err, "proxyTCP %d failed %v", i, err)
|
||||
}(i, originProxy)
|
||||
|
||||
if i == concurrentRequests/4 {
|
||||
@@ -406,39 +421,47 @@ func TestOverrideWarpRoutingConfigWithLocalValues(t *testing.T) {
|
||||
require.EqualValues(t, expectedValue, warpRouting["maxActiveFlows"])
|
||||
}
|
||||
|
||||
remoteValue := uint64(100)
|
||||
remoteIngress := ingress.Ingress{}
|
||||
originDialer := ingress.NewOriginDialer(ingress.OriginConfig{
|
||||
DefaultDialer: testDefaultDialer,
|
||||
TCPWriteTimeout: 1 * time.Second,
|
||||
}, &testLogger)
|
||||
|
||||
// All the possible values set for MaxActiveFlows from the various points that can provide it:
|
||||
// 1. Initialized value
|
||||
// 2. Local CLI flag config
|
||||
// 3. Remote configuration value
|
||||
initValue := uint64(0)
|
||||
localValue := uint64(100)
|
||||
remoteValue := uint64(500)
|
||||
|
||||
initConfig := &Config{
|
||||
Ingress: &ingress.Ingress{},
|
||||
WarpRouting: ingress.WarpRoutingConfig{
|
||||
MaxActiveFlows: initValue,
|
||||
},
|
||||
OriginDialerService: originDialer,
|
||||
ConfigurationFlags: map[string]string{
|
||||
flags.MaxActiveFlows: fmt.Sprintf("%d", localValue),
|
||||
},
|
||||
}
|
||||
|
||||
// We expect the local configuration flag to be the starting value
|
||||
orchestrator, err := NewOrchestrator(ctx, initConfig, testTags, []ingress.Rule{}, &testLogger)
|
||||
require.NoError(t, err)
|
||||
|
||||
assertMaxActiveFlows(orchestrator, localValue)
|
||||
|
||||
// Assigning the MaxActiveFlows in the remote config should be ignored over the local config
|
||||
remoteWarpConfig := ingress.WarpRoutingConfig{
|
||||
MaxActiveFlows: remoteValue,
|
||||
}
|
||||
remoteConfig := &Config{
|
||||
Ingress: &remoteIngress,
|
||||
WarpRouting: remoteWarpConfig,
|
||||
ConfigurationFlags: map[string]string{},
|
||||
}
|
||||
orchestrator, err := NewOrchestrator(ctx, remoteConfig, testTags, []ingress.Rule{}, &testLogger)
|
||||
require.NoError(t, err)
|
||||
|
||||
assertMaxActiveFlows(orchestrator, remoteValue)
|
||||
|
||||
// Add a local override for the maxActiveFlows
|
||||
localValue := uint64(500)
|
||||
remoteConfig.ConfigurationFlags[flags.MaxActiveFlows] = fmt.Sprintf("%d", localValue)
|
||||
// Force a configuration refresh
|
||||
err = orchestrator.updateIngress(remoteIngress, remoteWarpConfig)
|
||||
err = orchestrator.updateIngress(ingress.Ingress{}, remoteWarpConfig)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check the value being used is the local one
|
||||
assertMaxActiveFlows(orchestrator, localValue)
|
||||
|
||||
// Remove local override for the maxActiveFlows
|
||||
delete(remoteConfig.ConfigurationFlags, flags.MaxActiveFlows)
|
||||
// Force a configuration refresh
|
||||
err = orchestrator.updateIngress(remoteIngress, remoteWarpConfig)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check the value being used is now the remote again
|
||||
assertMaxActiveFlows(orchestrator, remoteValue)
|
||||
}
|
||||
|
||||
func proxyHTTP(originProxy connection.OriginProxy, hostname string) (*http.Response, error) {
|
||||
@@ -546,6 +569,10 @@ func updateWithValidation(t *testing.T, orchestrator *Orchestrator, version int3
|
||||
|
||||
// TestClosePreviousProxies makes sure proxies started in the previous configuration version are shutdown
|
||||
func TestClosePreviousProxies(t *testing.T) {
|
||||
originDialer := ingress.NewOriginDialer(ingress.OriginConfig{
|
||||
DefaultDialer: testDefaultDialer,
|
||||
TCPWriteTimeout: 1 * time.Second,
|
||||
}, &testLogger)
|
||||
var (
|
||||
hostname = "hello.tunnel1.org"
|
||||
configWithHelloWorld = []byte(fmt.Sprintf(`
|
||||
@@ -576,7 +603,8 @@ func TestClosePreviousProxies(t *testing.T) {
|
||||
}
|
||||
`)
|
||||
initConfig = &Config{
|
||||
Ingress: &ingress.Ingress{},
|
||||
Ingress: &ingress.Ingress{},
|
||||
OriginDialerService: originDialer,
|
||||
}
|
||||
)
|
||||
|
||||
@@ -638,8 +666,13 @@ func TestPersistentConnection(t *testing.T) {
|
||||
hostname = "http://ws.tunnel.org"
|
||||
)
|
||||
msg := t.Name()
|
||||
originDialer := ingress.NewOriginDialer(ingress.OriginConfig{
|
||||
DefaultDialer: testDefaultDialer,
|
||||
TCPWriteTimeout: 1 * time.Second,
|
||||
}, &testLogger)
|
||||
initConfig := &Config{
|
||||
Ingress: &ingress.Ingress{},
|
||||
Ingress: &ingress.Ingress{},
|
||||
OriginDialerService: originDialer,
|
||||
}
|
||||
orchestrator, err := NewOrchestrator(t.Context(), initConfig, testTags, []ingress.Rule{}, &testLogger)
|
||||
require.NoError(t, err)
|
||||
@@ -752,8 +785,9 @@ func TestSerializeLocalConfig(t *testing.T) {
|
||||
ConfigurationFlags: map[string]string{"a": "b"},
|
||||
}
|
||||
|
||||
result, _ := json.Marshal(c)
|
||||
fmt.Println(string(result))
|
||||
result, err := json.Marshal(c)
|
||||
require.NoError(t, err)
|
||||
require.JSONEq(t, `{"__configuration_flags":{"a":"b"},"ingress":[],"warp-routing":{"connectTimeout":0,"tcpKeepAlive":0}}`, string(result))
|
||||
}
|
||||
|
||||
func wsEcho(w http.ResponseWriter, r *http.Request) {
|
||||
|
Reference in New Issue
Block a user