TUN-2714: New edge discovery. Connections try to reconnect to the same edge IP.

This commit is contained in:
Adam Chalmers
2020-02-05 18:55:26 -06:00
parent d6c2c4ee4a
commit a60c0273f5
18 changed files with 1221 additions and 801 deletions

View File

@@ -18,9 +18,11 @@ const (
)
type Connection struct {
id uuid.UUID
muxer *h2mux.Muxer
addr *net.TCPAddr
id uuid.UUID
muxer *h2mux.Muxer
addr *net.TCPAddr
isLongLived bool
longLivedID int
}
func newConnection(muxer *h2mux.Muxer, addr *net.TCPAddr) (*Connection, error) {

View File

@@ -1,420 +0,0 @@
package connection
import (
"context"
"crypto/tls"
"fmt"
"math/rand"
"net"
"sync"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const (
// Used to discover HA origintunneld servers
srvService = "origintunneld"
srvProto = "tcp"
srvName = "argotunnel.com"
// Used to fallback to DoT when we can't use the default resolver to
// discover HA origintunneld servers (GitHub issue #75).
dotServerName = "cloudflare-dns.com"
dotServerAddr = "1.1.1.1:853"
dotTimeout = time.Duration(15 * time.Second)
// SRV record resolution TTL
resolveEdgeAddrTTL = 1 * time.Hour
subsystemEdgeAddrResolver = "edgeAddrResolver"
)
// Redeclare network functions so they can be overridden in tests.
var (
netLookupSRV = net.LookupSRV
netLookupIP = net.LookupIP
)
// If the call to net.LookupSRV fails, try to fall back to DoT from Cloudflare directly.
//
// Note: Instead of DoT, we could also have used DoH. Either of these:
// - directly via the JSON API (https://1.1.1.1/dns-query?ct=application/dns-json&name=_origintunneld._tcp.argotunnel.com&type=srv)
// - indirectly via `tunneldns.NewUpstreamHTTPS()`
// But both of these cases miss out on a key feature from the stdlib:
// "The returned records are sorted by priority and randomized by weight within a priority."
// (https://golang.org/pkg/net/#Resolver.LookupSRV)
// Does this matter? I don't know. It may someday. Let's use DoT so we don't need to worry about it.
// See also: Go feature request for stdlib-supported DoH: https://github.com/golang/go/issues/27552
var fallbackLookupSRV = lookupSRVWithDOT
var friendlyDNSErrorLines = []string{
`Please try the following things to diagnose this issue:`,
` 1. ensure that argotunnel.com is returning "origintunneld" service records.`,
` Run your system's equivalent of: dig srv _origintunneld._tcp.argotunnel.com`,
` 2. ensure that your DNS resolver is not returning compressed SRV records.`,
` See GitHub issue https://github.com/golang/go/issues/27546`,
` For example, you could use Cloudflare's 1.1.1.1 as your resolver:`,
` https://developers.cloudflare.com/1.1.1.1/setting-up-1.1.1.1/`,
}
// EdgeServiceDiscoverer is an interface for looking up Cloudflare's edge network addresses
type EdgeServiceDiscoverer interface {
// Addr returns an unused address to connect to cloudflare's edge network.
// Before this method returns, the address will be removed from the pool of available addresses,
// so the caller can assume they have exclusive access to the address for tunneling purposes.
// The caller should remember to put it back via ReplaceAddr or MarkAddrBad.
Addr() (*net.TCPAddr, error)
// AnyAddr returns an address to connect to cloudflare's edge network.
// It may or may not be in active use for a tunnel.
// The caller should NOT return it via ReplaceAddr or MarkAddrBad!
AnyAddr() (*net.TCPAddr, error)
// ReplaceAddr is called when the address is no longer needed, e.g. due to a scaling-down of numHAConnections.
// It returns the address to the pool of available addresses.
ReplaceAddr(addr *net.TCPAddr)
// MarkAddrBad is called when there was a connectivity error for the address.
// It marks the address as unused but doesn't return it to the pool of available addresses.
MarkAddrBad(addr *net.TCPAddr)
// AvailableAddrs returns the number of addresses available for use
// (less those that have been marked bad).
AvailableAddrs() int
// Refresh rediscovers Cloudflare's edge network addresses.
// It resets the state of "bad" addresses but not those in active use.
Refresh() error
}
// EdgeAddrResolver discovers the addresses of Cloudflare's edge network through SRV record.
// It implements EdgeServiceDiscoverer interface
type EdgeAddrResolver struct {
sync.Mutex
// HA regions
regions []*region
// Logger for noteworthy events
logger *logrus.Entry
}
type region struct {
// Addresses that we expect will be in active use
addrs []*net.TCPAddr
// Addresses that are in active use.
// This is actually a set of net.TCPAddr's, but we can't make a map like
// map[net.TCPAddr]bool
// since net.TCPAddr contains a field of type net.IP and therefore it cannot be used as a map key.
// So instead we use map[string]*net.TCPAddr, where the keys are obtained by net.TCPAddr.String().
// (We keep the "raw" *net.TCPAddr values for the convenience of AnyAddr(). If that method didn't
// exist, we wouldn't strictly need the values, and this could be a map[string]bool.)
inUse map[string]*net.TCPAddr
// Addresses that were discarded due to a network error.
// Not sure what we'll do with these, but it feels good to keep them around for now.
bad []*net.TCPAddr
}
func NewEdgeAddrResolver(logger *logrus.Logger) (EdgeServiceDiscoverer, error) {
r := &EdgeAddrResolver{
logger: logger.WithField("subsystem", subsystemEdgeAddrResolver),
}
if err := r.Refresh(); err != nil {
return nil, err
}
return r, nil
}
func (r *EdgeAddrResolver) Addr() (*net.TCPAddr, error) {
r.Lock()
defer r.Unlock()
// compute the largest region based on len(addrs)
var largestRegion *region
{
if len(r.regions) == 0 {
return nil, errors.New("No HA regions")
}
largestRegion = r.regions[0]
for _, region := range r.regions[1:] {
if len(region.addrs) > len(largestRegion.addrs) {
largestRegion = region
}
}
if len(largestRegion.addrs) == 0 {
return nil, errors.New("No IP address to claim")
}
}
var addr *net.TCPAddr
addr, largestRegion.addrs = popAddr(largestRegion.addrs)
largestRegion.inUse[addr.String()] = addr
return addr, nil
}
func (r *EdgeAddrResolver) AnyAddr() (*net.TCPAddr, error) {
r.Lock()
defer r.Unlock()
for _, region := range r.regions {
// return an unused addr
if len(region.addrs) > 0 {
return region.addrs[rand.Intn(len(region.addrs))], nil
}
// return an addr that's in use
for _, addr := range region.inUse {
return addr, nil
}
}
return nil, fmt.Errorf("No IP addresses")
}
func (r *EdgeAddrResolver) ReplaceAddr(addr *net.TCPAddr) {
r.Lock()
defer r.Unlock()
addrString := addr.String()
for _, region := range r.regions {
if _, ok := region.inUse[addrString]; ok {
delete(region.inUse, addrString)
region.addrs = append(region.addrs, addr)
break
}
}
}
func (r *EdgeAddrResolver) MarkAddrBad(addr *net.TCPAddr) {
r.Lock()
defer r.Unlock()
addrString := addr.String()
for _, region := range r.regions {
if _, ok := region.inUse[addrString]; ok {
delete(region.inUse, addrString)
region.bad = append(region.bad, addr)
break
}
}
}
func (r *EdgeAddrResolver) AvailableAddrs() int {
r.Lock()
defer r.Unlock()
result := 0
for _, region := range r.regions {
result += len(region.addrs)
}
return result
}
func (r *EdgeAddrResolver) Refresh() error {
addrLists, err := EdgeDiscovery(r.logger)
if err != nil {
return err
}
r.Lock()
defer r.Unlock()
inUse := allInUse(r.regions)
r.regions = makeHARegions(addrLists, inUse)
return nil
}
// EdgeDiscovery implements HA service discovery lookup.
func EdgeDiscovery(logger *logrus.Entry) ([][]*net.TCPAddr, error) {
_, addrs, err := netLookupSRV(srvService, srvProto, srvName)
if err != nil {
_, fallbackAddrs, fallbackErr := fallbackLookupSRV(srvService, srvProto, srvName)
if fallbackErr != nil || len(fallbackAddrs) == 0 {
// use the original DNS error `err` in messages, not `fallbackErr`
logger.Errorln("Error looking up Cloudflare edge IPs: the DNS query failed:", err)
for _, s := range friendlyDNSErrorLines {
logger.Errorln(s)
}
return nil, errors.Wrapf(err, "Could not lookup srv records on _%v._%v.%v", srvService, srvProto, srvName)
}
// Accept the fallback results and keep going
addrs = fallbackAddrs
}
var resolvedIPsPerCNAME [][]*net.TCPAddr
for _, addr := range addrs {
ips, err := resolveSRVToTCP(addr)
if err != nil {
return nil, err
}
resolvedIPsPerCNAME = append(resolvedIPsPerCNAME, ips)
}
return resolvedIPsPerCNAME, nil
}
func lookupSRVWithDOT(service, proto, name string) (cname string, addrs []*net.SRV, err error) {
// Inspiration: https://github.com/artyom/dot/blob/master/dot.go
r := &net.Resolver{
PreferGo: true,
Dial: func(ctx context.Context, _ string, _ string) (net.Conn, error) {
var dialer net.Dialer
conn, err := dialer.DialContext(ctx, "tcp", dotServerAddr)
if err != nil {
return nil, err
}
tlsConfig := &tls.Config{ServerName: dotServerName}
return tls.Client(conn, tlsConfig), nil
},
}
ctx, cancel := context.WithTimeout(context.Background(), dotTimeout)
defer cancel()
return r.LookupSRV(ctx, srvService, srvProto, srvName)
}
func resolveSRVToTCP(srv *net.SRV) ([]*net.TCPAddr, error) {
ips, err := netLookupIP(srv.Target)
if err != nil {
return nil, errors.Wrapf(err, "Couldn't resolve SRV record %v", srv)
}
if len(ips) == 0 {
return nil, fmt.Errorf("SRV record %v had no IPs", srv)
}
addrs := make([]*net.TCPAddr, len(ips))
for i, ip := range ips {
addrs[i] = &net.TCPAddr{IP: ip, Port: int(srv.Port)}
}
return addrs, nil
}
// EdgeHostnameResolver discovers the addresses of Cloudflare's edge network via a list of server hostnames.
// It implements EdgeServiceDiscoverer interface, and is used mainly for testing connectivity.
type EdgeHostnameResolver struct {
sync.Mutex
// hostnames of edge servers
hostnames []string
// Addrs to connect to cloudflare's edge network
addrs []*net.TCPAddr
// Addresses that are in active use.
// This is actually a set of net.TCPAddr's. We have to encode the keys
// with .String(), since net.TCPAddr contains a field of type net.IP and
// therefore it cannot be used as a map key
inUse map[string]*net.TCPAddr
// Addresses that were discarded due to a network error.
// Not sure what we'll do with these, but it feels good to keep them around for now.
bad []*net.TCPAddr
}
func NewEdgeHostnameResolver(edgeHostnames []string) (EdgeServiceDiscoverer, error) {
r := &EdgeHostnameResolver{
hostnames: edgeHostnames,
inUse: map[string]*net.TCPAddr{},
}
if err := r.Refresh(); err != nil {
return nil, err
}
return r, nil
}
func (r *EdgeHostnameResolver) Addr() (*net.TCPAddr, error) {
r.Lock()
defer r.Unlock()
if len(r.addrs) == 0 {
return nil, errors.New("No IP address to claim")
}
var addr *net.TCPAddr
addr, r.addrs = popAddr(r.addrs)
r.inUse[addr.String()] = addr
return addr, nil
}
func (r *EdgeHostnameResolver) AnyAddr() (*net.TCPAddr, error) {
r.Lock()
defer r.Unlock()
// return an unused addr
if len(r.addrs) > 0 {
return r.addrs[rand.Intn(len(r.addrs))], nil
}
// return an addr that's in use
for _, addr := range r.inUse {
return addr, nil
}
return nil, errors.New("No IP addresses")
}
func (r *EdgeHostnameResolver) ReplaceAddr(addr *net.TCPAddr) {
r.Lock()
defer r.Unlock()
delete(r.inUse, addr.String())
r.addrs = append(r.addrs, addr)
}
func (r *EdgeHostnameResolver) MarkAddrBad(addr *net.TCPAddr) {
r.Lock()
defer r.Unlock()
delete(r.inUse, addr.String())
r.bad = append(r.bad, addr)
}
func (r *EdgeHostnameResolver) AvailableAddrs() int {
r.Lock()
defer r.Unlock()
return len(r.addrs)
}
func (r *EdgeHostnameResolver) Refresh() error {
newAddrs, err := ResolveAddrs(r.hostnames)
if err != nil {
return err
}
r.Lock()
defer r.Unlock()
var notInUse []*net.TCPAddr
for _, newAddr := range newAddrs {
if _, ok := r.inUse[newAddr.String()]; !ok {
notInUse = append(notInUse, newAddr)
}
}
r.addrs = notInUse
r.bad = nil
return nil
}
// Resolve TCP address given a list of addresses. Address can be a hostname, however, it will return at most one
// of the hostname's IP addresses
func ResolveAddrs(addrs []string) ([]*net.TCPAddr, error) {
var tcpAddrs []*net.TCPAddr
for _, addr := range addrs {
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, err
}
tcpAddrs = append(tcpAddrs, tcpAddr)
}
return tcpAddrs, nil
}
// Compute total set of IP addresses in use. This is useful if the regions
// are returned in a different order, or if an IP address is assigned to
// a different region for some reasion.
func allInUse(regions []*region) map[string]*net.TCPAddr {
result := make(map[string]*net.TCPAddr)
for _, region := range regions {
for k, v := range region.inUse {
result[k] = v
}
}
return result
}
func makeHARegions(addrLists [][]*net.TCPAddr, inUse map[string]*net.TCPAddr) (regions []*region) {
for _, addrList := range addrLists {
region := &region{inUse: map[string]*net.TCPAddr{}}
for _, addr := range addrList {
addrString := addr.String()
// No matter what region `addr` used to belong to, it's now a part
// of this region, so add it to this region's `inUse` map.
if _, ok := inUse[addrString]; ok {
region.inUse[addrString] = addr
} else {
region.addrs = append(region.addrs, addr)
}
}
regions = append(regions, region)
}
return
}
func popAddr(addrs []*net.TCPAddr) (*net.TCPAddr, []*net.TCPAddr) {
first := addrs[0]
addrs[0] = nil // prevent memory leak
addrs = addrs[1:]
return first, addrs
}

View File

@@ -1,317 +0,0 @@
package connection
import (
"net"
"sync"
"testing"
"testing/quick"
"time"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
)
func TestEdgeDiscovery(t *testing.T) {
mockAddrs := newMockAddrs(19, 2, 5)
netLookupSRV = mockNetLookupSRV(mockAddrs)
netLookupIP = mockNetLookupIP(mockAddrs)
expectedAddrSet := map[string]bool{}
for _, addrs := range mockAddrs.addrMap {
for _, addr := range addrs {
expectedAddrSet[addr.String()] = true
}
}
addrLists, err := EdgeDiscovery(logrus.New().WithFields(logrus.Fields{}))
assert.NoError(t, err)
actualAddrSet := map[string]bool{}
for _, addrs := range addrLists {
for _, addr := range addrs {
actualAddrSet[addr.String()] = true
}
}
assert.Equal(t, expectedAddrSet, actualAddrSet)
}
func TestAllInUse(t *testing.T) {
for _, testCase := range []struct {
regions []*region
expected map[string]*net.TCPAddr
}{
{
regions: nil,
expected: map[string]*net.TCPAddr{},
},
{
regions: []*region{
&region{inUse: map[string]*net.TCPAddr{}},
&region{inUse: map[string]*net.TCPAddr{}},
},
expected: map[string]*net.TCPAddr{},
},
{
regions: []*region{
&region{inUse: map[string]*net.TCPAddr{":1": &net.TCPAddr{Port: 1}}},
&region{inUse: map[string]*net.TCPAddr{":4": &net.TCPAddr{Port: 4}}},
},
expected: map[string]*net.TCPAddr{":1": &net.TCPAddr{Port: 1}, ":4": &net.TCPAddr{Port: 4}},
},
} {
actual := allInUse(testCase.regions)
assert.Equal(t, testCase.expected, actual)
}
}
func TestMakeRegions(t *testing.T) {
for _, testCase := range []struct {
addrList [][]*net.TCPAddr
inUse map[string]*net.TCPAddr
expected []*region
}{
{
addrList: [][]*net.TCPAddr{},
expected: nil,
},
{
addrList: [][]*net.TCPAddr{
[]*net.TCPAddr{&net.TCPAddr{Port: 1}, &net.TCPAddr{Port: 2}},
},
expected: []*region{
&region{addrs: []*net.TCPAddr{&net.TCPAddr{Port: 1}, &net.TCPAddr{Port: 2}}, inUse: map[string]*net.TCPAddr{}},
},
},
{
addrList: [][]*net.TCPAddr{
[]*net.TCPAddr{&net.TCPAddr{Port: 1}, &net.TCPAddr{Port: 2}},
[]*net.TCPAddr{&net.TCPAddr{Port: 3}, &net.TCPAddr{Port: 4}},
},
expected: []*region{
&region{addrs: []*net.TCPAddr{&net.TCPAddr{Port: 1}, &net.TCPAddr{Port: 2}}, inUse: map[string]*net.TCPAddr{}},
&region{addrs: []*net.TCPAddr{&net.TCPAddr{Port: 3}, &net.TCPAddr{Port: 4}}, inUse: map[string]*net.TCPAddr{}},
},
},
{
addrList: [][]*net.TCPAddr{
[]*net.TCPAddr{&net.TCPAddr{Port: 1}, &net.TCPAddr{Port: 2}},
[]*net.TCPAddr{&net.TCPAddr{Port: 3}, &net.TCPAddr{Port: 4}},
},
inUse: map[string]*net.TCPAddr{
":1": &net.TCPAddr{Port: 1},
":4": &net.TCPAddr{Port: 4},
},
expected: []*region{
&region{addrs: []*net.TCPAddr{&net.TCPAddr{Port: 2}}, inUse: map[string]*net.TCPAddr{":1": &net.TCPAddr{Port: 1}}},
&region{addrs: []*net.TCPAddr{&net.TCPAddr{Port: 3}}, inUse: map[string]*net.TCPAddr{":4": &net.TCPAddr{Port: 4}}},
},
},
} {
actual := makeHARegions(testCase.addrList, testCase.inUse)
assert.Equal(t, testCase.expected, actual)
}
}
func assertIsBalanced(t *testing.T, regions []*region) bool {
// Compute max(len(region.addrs) for region in regions)
// No region should have significantly fewer addresses than this
var longestAddrs int
{
longestAddrs = 0
for _, region := range regions {
if l := len(region.addrs); l > longestAddrs {
longestAddrs = l
}
}
}
for _, region := range regions {
if len(region.addrs) == longestAddrs || len(region.addrs) == longestAddrs-1 {
continue
}
return assert.Fail(t,
"found a region with %v free addrs, while the longest addrs list is %v",
len(region.addrs), longestAddrs)
}
return true
}
// Various end-to-end tests, run with quickcheck (i.e. the testing/quick package)
func TestEdgeAddrResolver(t *testing.T) {
concurrentReplacement := func(mockAddrs mockAddrs) bool {
netLookupSRV = mockNetLookupSRV(mockAddrs)
netLookupIP = mockNetLookupIP(mockAddrs)
resolver, err := NewEdgeAddrResolver(logrus.New())
if !assert.NoError(t, err) {
return false
}
assert.Equal(t, mockAddrs.numAddrs, resolver.AvailableAddrs(),
"every address should be initially available")
// Create several goroutines to simulate HA connections that acquire
// and replace IP addresses.
var wg sync.WaitGroup
wg.Add(mockAddrs.numAddrs)
for i := 0; i < mockAddrs.numAddrs; i++ {
go func() {
defer wg.Done()
const reconnectionCount = 50
for i := 0; i < reconnectionCount; i++ {
if resolver.AvailableAddrs() == 0 {
err = resolver.Refresh()
assert.NoError(t, err)
}
addr, err := resolver.Addr()
if !assert.NoError(t, err) {
return
}
time.Sleep(0) // allow some other goroutine to run
resolver.ReplaceAddr(addr)
time.Sleep(0) // allow some other goroutine to run
}
}()
}
wg.Wait()
assert.Equal(t, mockAddrs.numAddrs, resolver.AvailableAddrs(),
"every address should be available after replacement")
return !t.Failed()
}
badAddrWithRefresh := func(mockAddrs mockAddrs) bool {
netLookupSRV = mockNetLookupSRV(mockAddrs)
netLookupIP = mockNetLookupIP(mockAddrs)
resolver, err := NewEdgeAddrResolver(logrus.New())
if !assert.NoError(t, err) {
return false
}
assert.Equal(t, mockAddrs.numAddrs, resolver.AvailableAddrs(),
"every address should be initially available")
var addrs []*net.TCPAddr
for i := 0; i < mockAddrs.numAddrs; i++ {
assert.Equal(t, mockAddrs.numAddrs-i, resolver.AvailableAddrs())
addr, err := resolver.Addr()
assert.NoError(t, err)
addrs = append(addrs, addr)
}
assert.Equal(t, 0, resolver.AvailableAddrs(), "all addresses should have been taken")
_, err = resolver.Addr()
assert.Error(t, err)
anyAddr, err := resolver.AnyAddr()
assert.NoError(t, err, "should still be okay to call AnyAddr")
resolver.MarkAddrBad(anyAddr)
assert.Equal(t, 0, resolver.AvailableAddrs(), "all addresses should still be used")
_, err = resolver.Addr()
assert.Error(t, err, "all addresses should still be used")
err = resolver.Refresh()
assert.NoError(t, err, "Refresh() should have worked")
assert.Equal(t, 1, resolver.AvailableAddrs(),
"Refresh() should have reset the state of the 'bad' address")
addr, err := resolver.Addr()
assert.NoError(t, err)
assert.Equal(t, anyAddr, addr)
_, err = resolver.Addr()
assert.Error(t, err, "all addresses should be used again")
return !t.Failed()
}
assert.NoError(t, quick.Check(concurrentReplacement, nil))
assert.NoError(t, quick.Check(badAddrWithRefresh, nil))
}
// "White-box" test: runs Addr() and checks internal state
func TestEdgeAddrResolver_Addr(t *testing.T) {
e := &EdgeAddrResolver{regions: nil}
addr, err := e.Addr()
assert.Error(t, err)
testRegions := func() []*region {
return []*region{
&region{addrs: []*net.TCPAddr{&net.TCPAddr{Port: 1}}, inUse: map[string]*net.TCPAddr{":2": &net.TCPAddr{Port: 2}, ":3": &net.TCPAddr{Port: 3}}},
&region{addrs: []*net.TCPAddr{&net.TCPAddr{Port: 4}, &net.TCPAddr{Port: 5}}, inUse: map[string]*net.TCPAddr{":6": &net.TCPAddr{Port: 6}}},
&region{addrs: []*net.TCPAddr{&net.TCPAddr{Port: 7}, &net.TCPAddr{Port: 8}}, inUse: map[string]*net.TCPAddr{":9": &net.TCPAddr{Port: 9}}},
}
}
e = &EdgeAddrResolver{regions: testRegions()}
addr, err = e.Addr()
assert.NoError(t, err)
assert.Equal(t, &net.TCPAddr{Port: 4}, addr)
var expected []*region
{
expected = testRegions()
expected[1].addrs = expected[1].addrs[1:]
expected[1].inUse[":4"] = &net.TCPAddr{Port: 4}
}
assert.Equal(t, expected, e.regions)
}
// "White-box" test: runs AnyAddr() and checks internal state
func TestEdgeAddrResolver_AnyAddr(t *testing.T) {
e := &EdgeAddrResolver{regions: nil}
addr, err := e.AnyAddr()
assert.Error(t, err)
e = &EdgeAddrResolver{regions: []*region{&region{addrs: []*net.TCPAddr{&net.TCPAddr{Port: 1}}, inUse: map[string]*net.TCPAddr{":2": &net.TCPAddr{Port: 2}}}}}
addr, err = e.AnyAddr()
assert.NoError(t, err)
assert.Equal(t, &net.TCPAddr{Port: 1}, addr, "should have chosen the inactive address")
e = &EdgeAddrResolver{regions: []*region{&region{inUse: map[string]*net.TCPAddr{":1": &net.TCPAddr{Port: 1}}}}}
addr, err = e.AnyAddr()
assert.NoError(t, err)
assert.Equal(t, &net.TCPAddr{Port: 1}, addr, "should have chosen an active address rather than nothing")
}
// "White-box" test: runs ReplaceAddr() and checks internal state
func TestEdgeAddrResolver_ReplaceAddr(t *testing.T) {
e := &EdgeAddrResolver{regions: nil}
e.ReplaceAddr(&net.TCPAddr{Port: 1}) // this shouldn't panic, I guess
testRegions := func() []*region {
return []*region{
&region{addrs: []*net.TCPAddr{&net.TCPAddr{Port: 1}}, inUse: map[string]*net.TCPAddr{":2": &net.TCPAddr{Port: 2}, ":3": &net.TCPAddr{Port: 3}}},
&region{addrs: []*net.TCPAddr{&net.TCPAddr{Port: 4}, &net.TCPAddr{Port: 5}}, inUse: map[string]*net.TCPAddr{":6": &net.TCPAddr{Port: 6}}},
&region{addrs: []*net.TCPAddr{&net.TCPAddr{Port: 7}, &net.TCPAddr{Port: 8}}, inUse: map[string]*net.TCPAddr{":9": &net.TCPAddr{Port: 9}}},
}
}
e = &EdgeAddrResolver{regions: testRegions()}
e.ReplaceAddr(&net.TCPAddr{Port: 6})
var expected []*region
{
expected = testRegions()
delete(expected[1].inUse, ":6")
expected[1].addrs = append(expected[1].addrs, &net.TCPAddr{Port: 6})
}
assert.Equal(t, expected, e.regions)
}
// "White-box" test: runs MarkAddrBad() and checks internal state
func TestEdgeAddrResolver_MarkAddrBad(t *testing.T) {
e := &EdgeAddrResolver{regions: nil}
e.ReplaceAddr(&net.TCPAddr{Port: 1}) // this shouldn't panic, I guess
testRegions := func() []*region {
return []*region{
&region{addrs: []*net.TCPAddr{&net.TCPAddr{Port: 1}}, inUse: map[string]*net.TCPAddr{":2": &net.TCPAddr{Port: 2}, ":3": &net.TCPAddr{Port: 3}}},
&region{addrs: []*net.TCPAddr{&net.TCPAddr{Port: 4}, &net.TCPAddr{Port: 5}}, inUse: map[string]*net.TCPAddr{":6": &net.TCPAddr{Port: 6}}},
&region{addrs: []*net.TCPAddr{&net.TCPAddr{Port: 7}, &net.TCPAddr{Port: 8}}, inUse: map[string]*net.TCPAddr{":9": &net.TCPAddr{Port: 9}}},
}
}
e = &EdgeAddrResolver{regions: testRegions()}
e.MarkAddrBad(&net.TCPAddr{Port: 6})
var expected []*region
{
expected = testRegions()
delete(expected[1].inUse, ":6")
expected[1].bad = append(expected[1].bad, &net.TCPAddr{Port: 6})
}
assert.Equal(t, expected, e.regions)
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/cloudflare/cloudflared/cmd/cloudflared/buildinfo"
"github.com/cloudflare/cloudflared/edgediscovery"
"github.com/cloudflare/cloudflared/h2mux"
"github.com/cloudflare/cloudflared/streamhandler"
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
@@ -35,7 +36,7 @@ type EdgeManager struct {
// cloudflaredConfig is the cloudflared configuration that is determined when the process first starts
cloudflaredConfig *CloudflaredConfig
// serviceDiscoverer returns the next edge addr to connect to
serviceDiscoverer EdgeServiceDiscoverer
serviceDiscoverer *edgediscovery.Edge
// state is attributes of ConnectionManager that can change during runtime.
state *edgeManagerState
@@ -73,7 +74,7 @@ func NewEdgeManager(
edgeConnMgrConfigurable *EdgeManagerConfigurable,
userCredential []byte,
tlsConfig *tls.Config,
serviceDiscoverer EdgeServiceDiscoverer,
serviceDiscoverer *edgediscovery.Edge,
cloudflaredConfig *CloudflaredConfig,
logger *logrus.Logger,
) *EdgeManager {
@@ -91,27 +92,29 @@ func NewEdgeManager(
func (em *EdgeManager) Run(ctx context.Context) error {
defer em.shutdown()
resolveEdgeIPTicker := time.Tick(resolveEdgeAddrTTL)
// Currently, declarative tunnels don't have any concept of a stable connection
// Each edge connection is transient and when it dies, it is replaced by a different one,
// not restarted.
// So in the future we should really change this so that n connections are stored individually
connIndex := 0
for {
select {
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "EdgeConnectionManager terminated")
case <-resolveEdgeIPTicker:
if err := em.serviceDiscoverer.Refresh(); err != nil {
em.logger.WithError(err).Warn("Cannot refresh Cloudflare edge addresses")
}
default:
time.Sleep(1 * time.Second)
}
// Create/delete connection one at a time, so we don't need to adjust for connections that are being created/deleted
// in shouldCreateConnection or shouldReduceConnection calculation
if em.state.shouldCreateConnection(em.serviceDiscoverer.AvailableAddrs()) {
if connErr := em.newConnection(ctx); connErr != nil {
if connErr := em.newConnection(ctx, connIndex); connErr != nil {
if !connErr.ShouldRetry {
em.logger.WithError(connErr).Error(em.noRetryMessage())
return connErr
}
em.logger.WithError(connErr).Error("cannot create new connection")
} else {
connIndex++
}
} else if em.state.shouldReduceConnection() {
if err := em.closeConnection(ctx); err != nil {
@@ -126,8 +129,8 @@ func (em *EdgeManager) UpdateConfigurable(newConfigurable *EdgeManagerConfigurab
em.state.updateConfigurable(newConfigurable)
}
func (em *EdgeManager) newConnection(ctx context.Context) *tunnelpogs.ConnectError {
edgeTCPAddr, err := em.serviceDiscoverer.Addr()
func (em *EdgeManager) newConnection(ctx context.Context, index int) *tunnelpogs.ConnectError {
edgeTCPAddr, err := em.serviceDiscoverer.GetAddr(index)
if err != nil {
return retryConnection(fmt.Sprintf("edge address discovery error: %v", err))
}
@@ -197,7 +200,7 @@ func (em *EdgeManager) serveConn(ctx context.Context, conn *Connection) {
err := conn.Serve(ctx)
em.logger.WithError(err).Warn("Connection closed")
em.state.closeConnection(conn)
em.serviceDiscoverer.ReplaceAddr(conn.addr)
em.serviceDiscoverer.GiveBack(conn.addr)
}
func (em *EdgeManager) noRetryMessage() string {

View File

@@ -1,6 +1,7 @@
package connection
import (
"net"
"testing"
"time"
@@ -8,8 +9,8 @@ import (
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/cloudflare/cloudflared/cmd/cloudflared/buildinfo"
"github.com/cloudflare/cloudflared/edgediscovery"
"github.com/cloudflare/cloudflared/h2mux"
"github.com/cloudflare/cloudflared/streamhandler"
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
@@ -48,14 +49,15 @@ func mockEdgeManager() *EdgeManager {
newConfigChan := make(chan<- *pogs.ClientConfig)
useConfigResultChan := make(<-chan *pogs.UseConfigurationResult)
logger := logrus.New()
edge := edgediscovery.MockEdge(logger, []*net.TCPAddr{})
return NewEdgeManager(
streamhandler.NewStreamHandler(newConfigChan, useConfigResultChan, logger),
configurable,
[]byte{},
nil,
&mockEdgeServiceDiscoverer{},
edge,
cloudflaredConfig,
logrus.New(),
logger,
)
}