TUN-3462: Refactor cloudflared to separate origin from connection

This commit is contained in:
cthuang
2020-10-08 11:12:26 +01:00
parent a5a5b93b64
commit 9ac40dcf04
32 changed files with 2006 additions and 1339 deletions

91
connection/connection.go Normal file
View File

@@ -0,0 +1,91 @@
package connection
import (
"io"
"net/http"
"strconv"
"time"
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
"github.com/google/uuid"
)
const (
// edgeH2muxTLSServerName is the server name to establish h2mux connection with edge
edgeH2muxTLSServerName = "cftunnel.com"
// edgeH2TLSServerName is the server name to establish http2 connection with edge
edgeH2TLSServerName = "h2.cftunnel.com"
lbProbeUserAgentPrefix = "Mozilla/5.0 (compatible; Cloudflare-Traffic-Manager/1.0; +https://www.cloudflare.com/traffic-manager/;"
)
type Config struct {
OriginClient OriginClient
GracePeriod time.Duration
ReplaceExisting bool
}
type NamedTunnelConfig struct {
Auth pogs.TunnelAuth
ID uuid.UUID
Client pogs.ClientInfo
Protocol Protocol
}
type ClassicTunnelConfig struct {
Hostname string
OriginCert []byte
// feature-flag to use new edge reconnect tokens
UseReconnectToken bool
}
func (c *ClassicTunnelConfig) IsTrialZone() bool {
return c.Hostname == ""
}
type Protocol int64
const (
H2mux Protocol = iota
HTTP2
)
func ParseProtocol(s string) (Protocol, bool) {
switch s {
case "h2mux":
return H2mux, true
case "http2":
return HTTP2, true
default:
return 0, false
}
}
func (p Protocol) ServerName() string {
switch p {
case H2mux:
return edgeH2muxTLSServerName
case HTTP2:
return edgeH2TLSServerName
default:
return ""
}
}
type OriginClient interface {
Proxy(w ResponseWriter, req *http.Request, isWebsocket bool) error
}
type ResponseWriter interface {
WriteRespHeaders(*http.Response) error
WriteErrorResponse(error)
io.ReadWriter
}
type ConnectedFuse interface {
Connected()
IsConnected() bool
}
func uint8ToString(input uint8) string {
return strconv.FormatUint(uint64(input), 10)
}

View File

@@ -1,55 +0,0 @@
package connection
import (
"context"
"crypto/tls"
"net"
"time"
"github.com/pkg/errors"
)
// DialEdgeWithH2Mux makes a TLS connection to a Cloudflare edge node
func DialEdge(
ctx context.Context,
timeout time.Duration,
tlsConfig *tls.Config,
edgeTCPAddr *net.TCPAddr,
) (net.Conn, error) {
// Inherit from parent context so we can cancel (Ctrl-C) while dialing
dialCtx, dialCancel := context.WithTimeout(ctx, timeout)
defer dialCancel()
dialer := net.Dialer{}
edgeConn, err := dialer.DialContext(dialCtx, "tcp", edgeTCPAddr.String())
if err != nil {
return nil, newDialError(err, "DialContext error")
}
tlsEdgeConn := tls.Client(edgeConn, tlsConfig)
tlsEdgeConn.SetDeadline(time.Now().Add(timeout))
if err = tlsEdgeConn.Handshake(); err != nil {
return nil, newDialError(err, "TLS handshake with edge error")
}
// clear the deadline on the conn; h2mux has its own timeouts
tlsEdgeConn.SetDeadline(time.Time{})
return tlsEdgeConn, nil
}
// DialError is an error returned from DialEdge
type DialError struct {
cause error
}
func newDialError(err error, message string) error {
return DialError{cause: errors.Wrap(err, message)}
}
func (e DialError) Error() string {
return e.cause.Error()
}
func (e DialError) Cause() error {
return e.cause
}

76
connection/errors.go Normal file
View File

@@ -0,0 +1,76 @@
package connection
import (
"github.com/cloudflare/cloudflared/edgediscovery"
"github.com/cloudflare/cloudflared/h2mux"
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
"github.com/prometheus/client_golang/prometheus"
)
const (
DuplicateConnectionError = "EDUPCONN"
)
// RegisterTunnel error from client
type clientRegisterTunnelError struct {
cause error
}
func newRPCError(cause error, counter *prometheus.CounterVec, name rpcName) clientRegisterTunnelError {
counter.WithLabelValues(cause.Error(), string(name)).Inc()
return clientRegisterTunnelError{cause: cause}
}
func (e clientRegisterTunnelError) Error() string {
return e.cause.Error()
}
type DupConnRegisterTunnelError struct{}
var errDuplicationConnection = &DupConnRegisterTunnelError{}
func (e DupConnRegisterTunnelError) Error() string {
return "already connected to this server, trying another address"
}
// RegisterTunnel error from server
type serverRegisterTunnelError struct {
cause error
permanent bool
}
func (e serverRegisterTunnelError) Error() string {
return e.cause.Error()
}
func serverRegistrationErrorFromRPC(err error) *serverRegisterTunnelError {
if retryable, ok := err.(*tunnelpogs.RetryableError); ok {
return &serverRegisterTunnelError{
cause: retryable.Unwrap(),
permanent: false,
}
}
return &serverRegisterTunnelError{
cause: err,
permanent: true,
}
}
type muxerShutdownError struct{}
func (e muxerShutdownError) Error() string {
return "muxer shutdown"
}
func isHandshakeErrRecoverable(err error, connIndex uint8, observer *Observer) bool {
switch err.(type) {
case edgediscovery.DialError:
observer.Errorf("Connection %d unable to dial edge: %s", connIndex, err)
case h2mux.MuxerHandshakeError:
observer.Errorf("Connection %d handshake with edge server failed: %s", connIndex, err)
default:
observer.Errorf("Connection %d failed: %s", connIndex, err)
return false
}
return true
}

216
connection/h2mux.go Normal file
View File

@@ -0,0 +1,216 @@
package connection
import (
"context"
"net"
"net/http"
"time"
"github.com/cloudflare/cloudflared/h2mux"
"github.com/cloudflare/cloudflared/logger"
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
"github.com/cloudflare/cloudflared/websocket"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
const (
muxerTimeout = 5 * time.Second
openStreamTimeout = 30 * time.Second
)
type h2muxConnection struct {
config *Config
muxerConfig *MuxerConfig
originURL string
muxer *h2mux.Muxer
// connectionID is only used by metrics, and prometheus requires labels to be string
connIndexStr string
connIndex uint8
observer *Observer
}
type MuxerConfig struct {
HeartbeatInterval time.Duration
MaxHeartbeats uint64
CompressionSetting h2mux.CompressionSetting
MetricsUpdateFreq time.Duration
}
func (mc *MuxerConfig) H2MuxerConfig(h h2mux.MuxedStreamHandler, logger logger.Service) *h2mux.MuxerConfig {
return &h2mux.MuxerConfig{
Timeout: muxerTimeout,
Handler: h,
IsClient: true,
HeartbeatInterval: mc.HeartbeatInterval,
MaxHeartbeats: mc.MaxHeartbeats,
Logger: logger,
CompressionQuality: mc.CompressionSetting,
}
}
// NewTunnelHandler returns a TunnelHandler, origin LAN IP and error
func NewH2muxConnection(ctx context.Context,
config *Config,
muxerConfig *MuxerConfig,
originURL string,
edgeConn net.Conn,
connIndex uint8,
observer *Observer,
) (*h2muxConnection, error, bool) {
h := &h2muxConnection{
config: config,
muxerConfig: muxerConfig,
originURL: originURL,
connIndexStr: uint8ToString(connIndex),
connIndex: connIndex,
observer: observer,
}
// Establish a muxed connection with the edge
// Client mux handshake with agent server
muxer, err := h2mux.Handshake(edgeConn, edgeConn, *muxerConfig.H2MuxerConfig(h, observer), h2mux.ActiveStreams)
if err != nil {
recoverable := isHandshakeErrRecoverable(err, connIndex, observer)
return nil, err, recoverable
}
h.muxer = muxer
return h, nil, false
}
func (h *h2muxConnection) ServeNamedTunnel(ctx context.Context, namedTunnel *NamedTunnelConfig, credentialManager CredentialManager, connOptions *tunnelpogs.ConnectionOptions, connectedFuse ConnectedFuse) error {
errGroup, serveCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return h.serveMuxer(serveCtx)
})
errGroup.Go(func() error {
stream, err := h.newRPCStream(serveCtx, register)
if err != nil {
return err
}
rpcClient := newRegistrationRPCClient(ctx, stream, h.observer)
defer rpcClient.close()
if err = registerConnection(serveCtx, rpcClient, namedTunnel, connOptions, h.connIndex, h.observer); err != nil {
return err
}
connectedFuse.Connected()
return nil
})
errGroup.Go(func() error {
h.controlLoop(serveCtx, connectedFuse, true)
return nil
})
return errGroup.Wait()
}
func (h *h2muxConnection) ServeClassicTunnel(ctx context.Context, classicTunnel *ClassicTunnelConfig, credentialManager CredentialManager, registrationOptions *tunnelpogs.RegistrationOptions, connectedFuse ConnectedFuse) error {
errGroup, serveCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return h.serveMuxer(serveCtx)
})
errGroup.Go(func() (err error) {
defer func() {
if err == nil {
connectedFuse.Connected()
}
}()
if classicTunnel.UseReconnectToken && connectedFuse.IsConnected() {
err := h.reconnectTunnel(ctx, credentialManager, classicTunnel, registrationOptions)
if err == nil {
return nil
}
// log errors and proceed to RegisterTunnel
h.observer.Errorf("Couldn't reconnect connection %d. Reregistering it instead. Error was: %v", h.connIndex, err)
}
return h.registerTunnel(ctx, credentialManager, classicTunnel, registrationOptions)
})
errGroup.Go(func() error {
h.controlLoop(serveCtx, connectedFuse, false)
return nil
})
return errGroup.Wait()
}
func (h *h2muxConnection) serveMuxer(ctx context.Context) error {
// All routines should stop when muxer finish serving. When muxer is shutdown
// gracefully, it doesn't return an error, so we need to return errMuxerShutdown
// here to notify other routines to stop
err := h.muxer.Serve(ctx)
if err == nil {
return muxerShutdownError{}
}
return err
}
func (h *h2muxConnection) controlLoop(ctx context.Context, connectedFuse ConnectedFuse, isNamedTunnel bool) {
updateMetricsTickC := time.Tick(h.muxerConfig.MetricsUpdateFreq)
for {
select {
case <-ctx.Done():
// UnregisterTunnel blocks until the RPC call returns
if connectedFuse.IsConnected() {
h.unregister(isNamedTunnel)
}
h.muxer.Shutdown()
return
case <-updateMetricsTickC:
h.observer.metrics.updateMuxerMetrics(h.connIndexStr, h.muxer.Metrics())
}
}
}
func (h *h2muxConnection) newRPCStream(ctx context.Context, rpcName rpcName) (*h2mux.MuxedStream, error) {
openStreamCtx, openStreamCancel := context.WithTimeout(ctx, openStreamTimeout)
defer openStreamCancel()
stream, err := h.muxer.OpenRPCStream(openStreamCtx)
if err != nil {
return nil, err
}
return stream, nil
}
func (h *h2muxConnection) ServeStream(stream *h2mux.MuxedStream) error {
respWriter := &h2muxRespWriter{stream}
req, reqErr := h.newRequest(stream)
if reqErr != nil {
respWriter.WriteErrorResponse(reqErr)
return reqErr
}
return h.config.OriginClient.Proxy(respWriter, req, websocket.IsWebSocketUpgrade(req))
}
func (h *h2muxConnection) newRequest(stream *h2mux.MuxedStream) (*http.Request, error) {
req, err := http.NewRequest("GET", h.originURL, h2mux.MuxedStreamReader{MuxedStream: stream})
if err != nil {
return nil, errors.Wrap(err, "Unexpected error from http.NewRequest")
}
err = h2mux.H2RequestHeadersToH1Request(stream.Headers, req)
if err != nil {
return nil, errors.Wrap(err, "invalid request received")
}
return req, nil
}
type h2muxRespWriter struct {
*h2mux.MuxedStream
}
func (rp *h2muxRespWriter) WriteRespHeaders(resp *http.Response) error {
return rp.WriteHeaders(h2mux.H1ResponseToH2ResponseHeaders(resp))
}
func (rp *h2muxRespWriter) WriteErrorResponse(err error) {
rp.WriteHeaders([]h2mux.Header{
{Name: ":status", Value: "502"},
h2mux.CreateResponseMetaHeader(h2mux.ResponseMetaHeaderField, h2mux.ResponseSourceCloudflared),
})
rp.Write([]byte("502 Bad Gateway"))
}

253
connection/http2.go Normal file
View File

@@ -0,0 +1,253 @@
package connection
import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strings"
"github.com/cloudflare/cloudflared/h2mux"
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
"golang.org/x/net/http2"
)
const (
internalUpgradeHeader = "Cf-Cloudflared-Proxy-Connection-Upgrade"
websocketUpgrade = "websocket"
controlStreamUpgrade = "control-stream"
)
type HTTP2Connection struct {
conn net.Conn
server *http2.Server
config *Config
originURL *url.URL
namedTunnel *NamedTunnelConfig
connOptions *tunnelpogs.ConnectionOptions
observer *Observer
connIndexStr string
connIndex uint8
shutdownChan chan struct{}
connectedFuse ConnectedFuse
}
func NewHTTP2Connection(conn net.Conn, config *Config, originURL *url.URL, namedTunnelConfig *NamedTunnelConfig, connOptions *tunnelpogs.ConnectionOptions, observer *Observer, connIndex uint8, connectedFuse ConnectedFuse) (*HTTP2Connection, error) {
return &HTTP2Connection{
conn: conn,
server: &http2.Server{},
config: config,
originURL: originURL,
namedTunnel: namedTunnelConfig,
connOptions: connOptions,
observer: observer,
connIndexStr: uint8ToString(connIndex),
connIndex: connIndex,
shutdownChan: make(chan struct{}),
connectedFuse: connectedFuse,
}, nil
}
func (c *HTTP2Connection) Serve(ctx context.Context) {
go func() {
<-ctx.Done()
c.close()
}()
c.server.ServeConn(c.conn, &http2.ServeConnOpts{
Context: ctx,
Handler: c,
})
}
func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.URL.Scheme = c.originURL.Scheme
r.URL.Host = c.originURL.Host
respWriter := &http2RespWriter{
r: r.Body,
w: w,
}
if isControlStreamUpgrade(r) {
err := c.serveControlStream(r.Context(), respWriter)
if err != nil {
respWriter.WriteErrorResponse(err)
}
} else if isWebsocketUpgrade(r) {
wsRespWriter, err := newWSRespWriter(respWriter)
if err != nil {
respWriter.WriteErrorResponse(err)
return
}
stripWebsocketUpgradeHeader(r)
c.config.OriginClient.Proxy(wsRespWriter, r, true)
} else {
c.config.OriginClient.Proxy(respWriter, r, false)
}
}
func (c *HTTP2Connection) serveControlStream(ctx context.Context, h2RespWriter *http2RespWriter) error {
stream, err := newWSRespWriter(h2RespWriter)
if err != nil {
return err
}
rpcClient := newRegistrationRPCClient(ctx, stream, c.observer)
defer rpcClient.close()
if err = registerConnection(ctx, rpcClient, c.namedTunnel, c.connOptions, c.connIndex, c.observer); err != nil {
return err
}
c.connectedFuse.Connected()
<-c.shutdownChan
c.gracefulShutdown(ctx, rpcClient)
close(c.shutdownChan)
return nil
}
func (c *HTTP2Connection) registerConnection(
ctx context.Context,
rpcClient tunnelpogs.RegistrationServer_PogsClient,
) error {
connDetail, err := rpcClient.RegisterConnection(
ctx,
c.namedTunnel.Auth,
c.namedTunnel.ID,
c.connIndex,
c.connOptions,
)
if err != nil {
c.observer.Errorf("Cannot register connection, err: %v", err)
return err
}
c.observer.Infof("Connection %s registered with %s using ID %s", c.connIndexStr, connDetail.Location, connDetail.UUID)
return nil
}
func (c *HTTP2Connection) gracefulShutdown(ctx context.Context, rpcClient *registrationServerClient) {
ctx, cancel := context.WithTimeout(ctx, c.config.GracePeriod)
defer cancel()
rpcClient.client.UnregisterConnection(ctx)
}
func (c *HTTP2Connection) close() {
// Send signal to control loop to start graceful shutdown
c.shutdownChan <- struct{}{}
// Wait for control loop to close channel
<-c.shutdownChan
c.conn.Close()
}
type http2RespWriter struct {
r io.Reader
w http.ResponseWriter
}
func (rp *http2RespWriter) WriteRespHeaders(resp *http.Response) error {
dest := rp.w.Header()
userHeaders := make(http.Header, len(resp.Header))
for header, values := range resp.Header {
// Since these are http2 headers, they're required to be lowercase
h2name := strings.ToLower(header)
for _, v := range values {
if h2name == "content-length" {
// This header has meaning in HTTP/2 and will be used by the edge,
// so it should be sent as an HTTP/2 response header.
dest.Add(h2name, v)
// Since these are http2 headers, they're required to be lowercase
} else if !h2mux.IsControlHeader(h2name) || h2mux.IsWebsocketClientHeader(h2name) {
// User headers, on the other hand, must all be serialized so that
// HTTP/2 header validation won't be applied to HTTP/1 header values
userHeaders.Add(h2name, v)
}
}
}
// Perform user header serialization and set them in the single header
dest.Set(h2mux.ResponseUserHeadersField, h2mux.SerializeHeaders(userHeaders))
status := resp.StatusCode
// HTTP2 removes support for 101 Switching Protocols https://tools.ietf.org/html/rfc7540#section-8.1.1
if status == http.StatusSwitchingProtocols {
status = http.StatusOK
}
rp.w.WriteHeader(status)
return nil
}
func (rp *http2RespWriter) WriteErrorResponse(err error) {
jsonResponseMetaHeader, err := json.Marshal(h2mux.ResponseMetaHeader{Source: h2mux.ResponseSourceCloudflared})
if err == nil {
rp.w.Header().Set(h2mux.ResponseMetaHeaderField, string(jsonResponseMetaHeader))
}
rp.w.WriteHeader(http.StatusBadGateway)
}
func (rp *http2RespWriter) Read(p []byte) (n int, err error) {
return rp.r.Read(p)
}
func (wr *http2RespWriter) Write(p []byte) (n int, err error) {
return wr.w.Write(p)
}
type wsRespWriter struct {
h2 *http2RespWriter
flusher http.Flusher
}
func newWSRespWriter(h2 *http2RespWriter) (*wsRespWriter, error) {
flusher, ok := h2.w.(http.Flusher)
if !ok {
return nil, fmt.Errorf("ResponseWriter doesn't implement http.Flusher")
}
return &wsRespWriter{
h2: h2,
flusher: flusher,
}, nil
}
func (rw *wsRespWriter) WriteRespHeaders(resp *http.Response) error {
err := rw.h2.WriteRespHeaders(resp)
if err != nil {
return err
}
rw.flusher.Flush()
return nil
}
func (rw *wsRespWriter) WriteErrorResponse(err error) {
rw.h2.WriteErrorResponse(err)
}
func (rw *wsRespWriter) Read(p []byte) (n int, err error) {
return rw.h2.Read(p)
}
func (rw *wsRespWriter) Write(p []byte) (n int, err error) {
n, err = rw.h2.Write(p)
if err != nil {
return
}
rw.flusher.Flush()
return
}
func (rw *wsRespWriter) Close() error {
return nil
}
func isControlStreamUpgrade(r *http.Request) bool {
return strings.ToLower(r.Header.Get(internalUpgradeHeader)) == controlStreamUpgrade
}
func isWebsocketUpgrade(r *http.Request) bool {
return strings.ToLower(r.Header.Get(internalUpgradeHeader)) == websocketUpgrade
}
func stripWebsocketUpgradeHeader(r *http.Request) {
r.Header.Del(internalUpgradeHeader)
}

409
connection/metrics.go Normal file
View File

@@ -0,0 +1,409 @@
package connection
import (
"sync"
"time"
"github.com/cloudflare/cloudflared/h2mux"
"github.com/prometheus/client_golang/prometheus"
)
const (
MetricsNamespace = "cloudflared"
TunnelSubsystem = "tunnel"
muxerSubsystem = "muxer"
)
type muxerMetrics struct {
rtt *prometheus.GaugeVec
rttMin *prometheus.GaugeVec
rttMax *prometheus.GaugeVec
receiveWindowAve *prometheus.GaugeVec
sendWindowAve *prometheus.GaugeVec
receiveWindowMin *prometheus.GaugeVec
receiveWindowMax *prometheus.GaugeVec
sendWindowMin *prometheus.GaugeVec
sendWindowMax *prometheus.GaugeVec
inBoundRateCurr *prometheus.GaugeVec
inBoundRateMin *prometheus.GaugeVec
inBoundRateMax *prometheus.GaugeVec
outBoundRateCurr *prometheus.GaugeVec
outBoundRateMin *prometheus.GaugeVec
outBoundRateMax *prometheus.GaugeVec
compBytesBefore *prometheus.GaugeVec
compBytesAfter *prometheus.GaugeVec
compRateAve *prometheus.GaugeVec
}
type tunnelMetrics struct {
timerRetries prometheus.Gauge
serverLocations *prometheus.GaugeVec
// locationLock is a mutex for oldServerLocations
locationLock sync.Mutex
// oldServerLocations stores the last server the tunnel was connected to
oldServerLocations map[string]string
regSuccess *prometheus.CounterVec
regFail *prometheus.CounterVec
rpcFail *prometheus.CounterVec
muxerMetrics *muxerMetrics
tunnelsHA tunnelsForHA
userHostnamesCounts *prometheus.CounterVec
}
func newMuxerMetrics() *muxerMetrics {
rtt := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: muxerSubsystem,
Name: "rtt",
Help: "Round-trip time in millisecond",
},
[]string{"connection_id"},
)
prometheus.MustRegister(rtt)
rttMin := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: muxerSubsystem,
Name: "rtt_min",
Help: "Shortest round-trip time in millisecond",
},
[]string{"connection_id"},
)
prometheus.MustRegister(rttMin)
rttMax := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: muxerSubsystem,
Name: "rtt_max",
Help: "Longest round-trip time in millisecond",
},
[]string{"connection_id"},
)
prometheus.MustRegister(rttMax)
receiveWindowAve := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: muxerSubsystem,
Name: "receive_window_ave",
Help: "Average receive window size in bytes",
},
[]string{"connection_id"},
)
prometheus.MustRegister(receiveWindowAve)
sendWindowAve := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: muxerSubsystem,
Name: "send_window_ave",
Help: "Average send window size in bytes",
},
[]string{"connection_id"},
)
prometheus.MustRegister(sendWindowAve)
receiveWindowMin := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: muxerSubsystem,
Name: "receive_window_min",
Help: "Smallest receive window size in bytes",
},
[]string{"connection_id"},
)
prometheus.MustRegister(receiveWindowMin)
receiveWindowMax := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: muxerSubsystem,
Name: "receive_window_max",
Help: "Largest receive window size in bytes",
},
[]string{"connection_id"},
)
prometheus.MustRegister(receiveWindowMax)
sendWindowMin := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: muxerSubsystem,
Name: "send_window_min",
Help: "Smallest send window size in bytes",
},
[]string{"connection_id"},
)
prometheus.MustRegister(sendWindowMin)
sendWindowMax := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: muxerSubsystem,
Name: "send_window_max",
Help: "Largest send window size in bytes",
},
[]string{"connection_id"},
)
prometheus.MustRegister(sendWindowMax)
inBoundRateCurr := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: muxerSubsystem,
Name: "inbound_bytes_per_sec_curr",
Help: "Current inbounding bytes per second, 0 if there is no incoming connection",
},
[]string{"connection_id"},
)
prometheus.MustRegister(inBoundRateCurr)
inBoundRateMin := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: muxerSubsystem,
Name: "inbound_bytes_per_sec_min",
Help: "Minimum non-zero inbounding bytes per second",
},
[]string{"connection_id"},
)
prometheus.MustRegister(inBoundRateMin)
inBoundRateMax := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: muxerSubsystem,
Name: "inbound_bytes_per_sec_max",
Help: "Maximum inbounding bytes per second",
},
[]string{"connection_id"},
)
prometheus.MustRegister(inBoundRateMax)
outBoundRateCurr := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: muxerSubsystem,
Name: "outbound_bytes_per_sec_curr",
Help: "Current outbounding bytes per second, 0 if there is no outgoing traffic",
},
[]string{"connection_id"},
)
prometheus.MustRegister(outBoundRateCurr)
outBoundRateMin := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: muxerSubsystem,
Name: "outbound_bytes_per_sec_min",
Help: "Minimum non-zero outbounding bytes per second",
},
[]string{"connection_id"},
)
prometheus.MustRegister(outBoundRateMin)
outBoundRateMax := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: muxerSubsystem,
Name: "outbound_bytes_per_sec_max",
Help: "Maximum outbounding bytes per second",
},
[]string{"connection_id"},
)
prometheus.MustRegister(outBoundRateMax)
compBytesBefore := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: muxerSubsystem,
Name: "comp_bytes_before",
Help: "Bytes sent via cross-stream compression, pre compression",
},
[]string{"connection_id"},
)
prometheus.MustRegister(compBytesBefore)
compBytesAfter := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: muxerSubsystem,
Name: "comp_bytes_after",
Help: "Bytes sent via cross-stream compression, post compression",
},
[]string{"connection_id"},
)
prometheus.MustRegister(compBytesAfter)
compRateAve := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: muxerSubsystem,
Name: "comp_rate_ave",
Help: "Average outbound cross-stream compression ratio",
},
[]string{"connection_id"},
)
prometheus.MustRegister(compRateAve)
return &muxerMetrics{
rtt: rtt,
rttMin: rttMin,
rttMax: rttMax,
receiveWindowAve: receiveWindowAve,
sendWindowAve: sendWindowAve,
receiveWindowMin: receiveWindowMin,
receiveWindowMax: receiveWindowMax,
sendWindowMin: sendWindowMin,
sendWindowMax: sendWindowMax,
inBoundRateCurr: inBoundRateCurr,
inBoundRateMin: inBoundRateMin,
inBoundRateMax: inBoundRateMax,
outBoundRateCurr: outBoundRateCurr,
outBoundRateMin: outBoundRateMin,
outBoundRateMax: outBoundRateMax,
compBytesBefore: compBytesBefore,
compBytesAfter: compBytesAfter,
compRateAve: compRateAve,
}
}
func (m *muxerMetrics) update(connectionID string, metrics *h2mux.MuxerMetrics) {
m.rtt.WithLabelValues(connectionID).Set(convertRTTMilliSec(metrics.RTT))
m.rttMin.WithLabelValues(connectionID).Set(convertRTTMilliSec(metrics.RTTMin))
m.rttMax.WithLabelValues(connectionID).Set(convertRTTMilliSec(metrics.RTTMax))
m.receiveWindowAve.WithLabelValues(connectionID).Set(metrics.ReceiveWindowAve)
m.sendWindowAve.WithLabelValues(connectionID).Set(metrics.SendWindowAve)
m.receiveWindowMin.WithLabelValues(connectionID).Set(float64(metrics.ReceiveWindowMin))
m.receiveWindowMax.WithLabelValues(connectionID).Set(float64(metrics.ReceiveWindowMax))
m.sendWindowMin.WithLabelValues(connectionID).Set(float64(metrics.SendWindowMin))
m.sendWindowMax.WithLabelValues(connectionID).Set(float64(metrics.SendWindowMax))
m.inBoundRateCurr.WithLabelValues(connectionID).Set(float64(metrics.InBoundRateCurr))
m.inBoundRateMin.WithLabelValues(connectionID).Set(float64(metrics.InBoundRateMin))
m.inBoundRateMax.WithLabelValues(connectionID).Set(float64(metrics.InBoundRateMax))
m.outBoundRateCurr.WithLabelValues(connectionID).Set(float64(metrics.OutBoundRateCurr))
m.outBoundRateMin.WithLabelValues(connectionID).Set(float64(metrics.OutBoundRateMin))
m.outBoundRateMax.WithLabelValues(connectionID).Set(float64(metrics.OutBoundRateMax))
m.compBytesBefore.WithLabelValues(connectionID).Set(float64(metrics.CompBytesBefore.Value()))
m.compBytesAfter.WithLabelValues(connectionID).Set(float64(metrics.CompBytesAfter.Value()))
m.compRateAve.WithLabelValues(connectionID).Set(float64(metrics.CompRateAve()))
}
func convertRTTMilliSec(t time.Duration) float64 {
return float64(t / time.Millisecond)
}
// Metrics that can be collected without asking the edge
func newTunnelMetrics(protocol Protocol) *tunnelMetrics {
maxConcurrentRequestsPerTunnel := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: TunnelSubsystem,
Name: "max_concurrent_requests_per_tunnel",
Help: "Largest number of concurrent requests proxied through each tunnel so far",
},
[]string{"connection_id"},
)
prometheus.MustRegister(maxConcurrentRequestsPerTunnel)
timerRetries := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: TunnelSubsystem,
Name: "timer_retries",
Help: "Unacknowledged heart beats count",
})
prometheus.MustRegister(timerRetries)
serverLocations := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: TunnelSubsystem,
Name: "server_locations",
Help: "Where each tunnel is connected to. 1 means current location, 0 means previous locations.",
},
[]string{"connection_id", "location"},
)
prometheus.MustRegister(serverLocations)
rpcFail := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: TunnelSubsystem,
Name: "tunnel_rpc_fail",
Help: "Count of RPC connection errors by type",
},
[]string{"error", "rpcName"},
)
prometheus.MustRegister(rpcFail)
registerFail := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: TunnelSubsystem,
Name: "tunnel_register_fail",
Help: "Count of tunnel registration errors by type",
},
[]string{"error", "rpcName"},
)
prometheus.MustRegister(registerFail)
userHostnamesCounts := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: TunnelSubsystem,
Name: "user_hostnames_counts",
Help: "Which user hostnames cloudflared is serving",
},
[]string{"userHostname"},
)
prometheus.MustRegister(userHostnamesCounts)
registerSuccess := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: TunnelSubsystem,
Name: "tunnel_register_success",
Help: "Count of successful tunnel registrations",
},
[]string{"rpcName"},
)
prometheus.MustRegister(registerSuccess)
var muxerMetrics *muxerMetrics
if protocol == H2mux {
muxerMetrics = newMuxerMetrics()
}
return &tunnelMetrics{
timerRetries: timerRetries,
serverLocations: serverLocations,
oldServerLocations: make(map[string]string),
muxerMetrics: muxerMetrics,
tunnelsHA: NewTunnelsForHA(),
regSuccess: registerSuccess,
regFail: registerFail,
rpcFail: rpcFail,
userHostnamesCounts: userHostnamesCounts,
}
}
func (t *tunnelMetrics) updateMuxerMetrics(connectionID string, metrics *h2mux.MuxerMetrics) {
t.muxerMetrics.update(connectionID, metrics)
}
func (t *tunnelMetrics) registerServerLocation(connectionID, loc string) {
t.locationLock.Lock()
defer t.locationLock.Unlock()
if oldLoc, ok := t.oldServerLocations[connectionID]; ok && oldLoc == loc {
return
} else if ok {
t.serverLocations.WithLabelValues(connectionID, oldLoc).Dec()
}
t.serverLocations.WithLabelValues(connectionID, loc).Inc()
t.oldServerLocations[connectionID] = loc
}

View File

@@ -1,118 +0,0 @@
package connection
import (
"fmt"
"math"
"math/rand"
"net"
"reflect"
"testing/quick"
)
type mockAddrs struct {
// a set of synthetic SRV records
addrMap map[net.SRV][]*net.TCPAddr
// the total number of addresses, aggregated across addrMap.
// For the convenience of test code that would otherwise have to compute
// this by hand every time.
numAddrs int
}
func newMockAddrs(port uint16, numRegions uint8, numAddrsPerRegion uint8) mockAddrs {
addrMap := make(map[net.SRV][]*net.TCPAddr)
numAddrs := 0
for r := uint8(0); r < numRegions; r++ {
var (
srv = net.SRV{Target: fmt.Sprintf("test-region-%v.example.com", r), Port: port}
addrs []*net.TCPAddr
)
for a := uint8(0); a < numAddrsPerRegion; a++ {
addrs = append(addrs, &net.TCPAddr{
IP: net.ParseIP(fmt.Sprintf("10.0.%v.%v", r, a)),
Port: int(port),
})
}
addrMap[srv] = addrs
numAddrs += len(addrs)
}
return mockAddrs{addrMap: addrMap, numAddrs: numAddrs}
}
var _ quick.Generator = mockAddrs{}
func (mockAddrs) Generate(rand *rand.Rand, size int) reflect.Value {
port := uint16(rand.Intn(math.MaxUint16))
numRegions := uint8(1 + rand.Intn(10))
numAddrsPerRegion := uint8(1 + rand.Intn(32))
result := newMockAddrs(port, numRegions, numAddrsPerRegion)
return reflect.ValueOf(result)
}
// Returns a function compatible with net.LookupSRV that will return the SRV
// records from mockAddrs.
func mockNetLookupSRV(
m mockAddrs,
) func(service, proto, name string) (cname string, addrs []*net.SRV, err error) {
var addrs []*net.SRV
for k := range m.addrMap {
addr := k
addrs = append(addrs, &addr)
// We can't just do
// addrs = append(addrs, &k)
// `k` will be reused by subsequent loop iterations,
// so all the copies of `&k` would point to the same location.
}
return func(_, _, _ string) (string, []*net.SRV, error) {
return "", addrs, nil
}
}
// Returns a function compatible with net.LookupIP that translates the SRV records
// from mockAddrs into IP addresses, based on the TCP addresses in mockAddrs.
func mockNetLookupIP(
m mockAddrs,
) func(host string) ([]net.IP, error) {
return func(host string) ([]net.IP, error) {
for srv, tcpAddrs := range m.addrMap {
if srv.Target != host {
continue
}
result := make([]net.IP, len(tcpAddrs))
for i, tcpAddr := range tcpAddrs {
result[i] = tcpAddr.IP
}
return result, nil
}
return nil, fmt.Errorf("No IPs for %v", host)
}
}
type mockEdgeServiceDiscoverer struct {
}
func (mr *mockEdgeServiceDiscoverer) Addr() (*net.TCPAddr, error) {
return &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 63102,
}, nil
}
func (mr *mockEdgeServiceDiscoverer) AnyAddr() (*net.TCPAddr, error) {
return &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 63102,
}, nil
}
func (mr *mockEdgeServiceDiscoverer) ReplaceAddr(addr *net.TCPAddr) {}
func (mr *mockEdgeServiceDiscoverer) MarkAddrBad(addr *net.TCPAddr) {}
func (mr *mockEdgeServiceDiscoverer) AvailableAddrs() int {
return 1
}
func (mr *mockEdgeServiceDiscoverer) Refresh() error {
return nil
}

99
connection/observer.go Normal file
View File

@@ -0,0 +1,99 @@
package connection
import (
"fmt"
"net/url"
"strings"
"github.com/cloudflare/cloudflared/cmd/cloudflared/ui"
"github.com/cloudflare/cloudflared/logger"
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
)
type Observer struct {
logger.Service
metrics *tunnelMetrics
tunnelEventChan chan<- ui.TunnelEvent
}
func NewObserver(logger logger.Service, tunnelEventChan chan<- ui.TunnelEvent, protocol Protocol) *Observer {
return &Observer{
logger,
newTunnelMetrics(protocol),
tunnelEventChan,
}
}
func (o *Observer) logServerInfo(connectionID uint8, location, msg string) {
// If launch-ui flag is set, send connect msg
if o.tunnelEventChan != nil {
o.tunnelEventChan <- ui.TunnelEvent{Index: connectionID, EventType: ui.Connected, Location: location}
}
o.Infof(msg)
o.metrics.registerServerLocation(uint8ToString(connectionID), location)
}
func (o *Observer) logTrialHostname(registration *tunnelpogs.TunnelRegistration) error {
// Print out the user's trial zone URL in a nice box (if they requested and got one and UI flag is not set)
if o.tunnelEventChan == nil {
if registrationURL, err := url.Parse(registration.Url); err == nil {
for _, line := range asciiBox(trialZoneMsg(registrationURL.String()), 2) {
o.Info(line)
}
} else {
o.Error("Failed to connect tunnel, please try again.")
return fmt.Errorf("empty URL in response from Cloudflare edge")
}
}
return nil
}
// Print out the given lines in a nice ASCII box.
func asciiBox(lines []string, padding int) (box []string) {
maxLen := maxLen(lines)
spacer := strings.Repeat(" ", padding)
border := "+" + strings.Repeat("-", maxLen+(padding*2)) + "+"
box = append(box, border)
for _, line := range lines {
box = append(box, "|"+spacer+line+strings.Repeat(" ", maxLen-len(line))+spacer+"|")
}
box = append(box, border)
return
}
func maxLen(lines []string) int {
max := 0
for _, line := range lines {
if len(line) > max {
max = len(line)
}
}
return max
}
func trialZoneMsg(url string) []string {
return []string{
"Your free tunnel has started! Visit it:",
" " + url,
}
}
func (o *Observer) sendRegisteringEvent() {
if o.tunnelEventChan != nil {
o.tunnelEventChan <- ui.TunnelEvent{EventType: ui.RegisteringTunnel}
}
}
func (o *Observer) sendConnectedEvent(connIndex uint8, location string) {
if o.tunnelEventChan != nil {
o.tunnelEventChan <- ui.TunnelEvent{Index: connIndex, EventType: ui.Connected, Location: location}
}
}
func (o *Observer) sendURL(url string) {
if o.tunnelEventChan != nil {
o.tunnelEventChan <- ui.TunnelEvent{EventType: ui.SetUrl, Url: url}
}
}

View File

@@ -0,0 +1,45 @@
package connection
import (
"strconv"
"sync"
"testing"
"github.com/stretchr/testify/assert"
)
// can only be called once
var m = newTunnelMetrics(H2mux)
func TestRegisterServerLocation(t *testing.T) {
tunnels := 20
var wg sync.WaitGroup
wg.Add(tunnels)
for i := 0; i < tunnels; i++ {
go func(i int) {
id := strconv.Itoa(i)
m.registerServerLocation(id, "LHR")
wg.Done()
}(i)
}
wg.Wait()
for i := 0; i < tunnels; i++ {
id := strconv.Itoa(i)
assert.Equal(t, "LHR", m.oldServerLocations[id])
}
wg.Add(tunnels)
for i := 0; i < tunnels; i++ {
go func(i int) {
id := strconv.Itoa(i)
m.registerServerLocation(id, "AUS")
wg.Done()
}(i)
}
wg.Wait()
for i := 0; i < tunnels; i++ {
id := strconv.Itoa(i)
assert.Equal(t, "AUS", m.oldServerLocations[id])
}
}

View File

@@ -2,40 +2,276 @@ package connection
import (
"context"
"fmt"
"io"
rpc "zombiezen.com/go/capnproto2/rpc"
"github.com/cloudflare/cloudflared/logger"
"github.com/cloudflare/cloudflared/tunnelrpc"
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
"zombiezen.com/go/capnproto2/rpc"
)
// NewTunnelRPCClient creates and returns a new RPC client, which will communicate
// using a stream on the given muxer
func NewTunnelRPCClient(
type tunnelServerClient struct {
client tunnelpogs.TunnelServer_PogsClient
transport rpc.Transport
}
// NewTunnelRPCClient creates and returns a new RPC client, which will communicate using a stream on the given muxer.
// This method is exported for supervisor to call Authenticate RPC
func NewTunnelServerClient(
ctx context.Context,
stream io.ReadWriteCloser,
logger logger.Service,
) (client tunnelpogs.TunnelServer_PogsClient, err error) {
) *tunnelServerClient {
transport := tunnelrpc.NewTransportLogger(logger, rpc.StreamTransport(stream))
conn := rpc.NewConn(
tunnelrpc.NewTransportLogger(logger, rpc.StreamTransport(stream)),
transport,
tunnelrpc.ConnLog(logger),
)
registrationClient := tunnelpogs.RegistrationServer_PogsClient{Client: conn.Bootstrap(ctx), Conn: conn}
client = tunnelpogs.TunnelServer_PogsClient{RegistrationServer_PogsClient: registrationClient, Client: conn.Bootstrap(ctx), Conn: conn}
return client, nil
return &tunnelServerClient{
client: tunnelpogs.TunnelServer_PogsClient{RegistrationServer_PogsClient: registrationClient, Client: conn.Bootstrap(ctx), Conn: conn},
transport: transport,
}
}
func NewRegistrationRPCClient(
func (tsc *tunnelServerClient) Authenticate(ctx context.Context, classicTunnel *ClassicTunnelConfig, registrationOptions *tunnelpogs.RegistrationOptions) (tunnelpogs.AuthOutcome, error) {
authResp, err := tsc.client.Authenticate(ctx, classicTunnel.OriginCert, classicTunnel.Hostname, registrationOptions)
if err != nil {
return nil, err
}
return authResp.Outcome(), nil
}
func (tsc *tunnelServerClient) Close() {
// Closing the client will also close the connection
tsc.client.Close()
tsc.transport.Close()
}
type registrationServerClient struct {
client tunnelpogs.RegistrationServer_PogsClient
transport rpc.Transport
}
func newRegistrationRPCClient(
ctx context.Context,
stream io.ReadWriteCloser,
logger logger.Service,
) (client tunnelpogs.RegistrationServer_PogsClient, err error) {
) *registrationServerClient {
transport := tunnelrpc.NewTransportLogger(logger, rpc.StreamTransport(stream))
conn := rpc.NewConn(
tunnelrpc.NewTransportLogger(logger, rpc.StreamTransport(stream)),
transport,
tunnelrpc.ConnLog(logger),
)
client = tunnelpogs.RegistrationServer_PogsClient{Client: conn.Bootstrap(ctx), Conn: conn}
return client, nil
return &registrationServerClient{
client: tunnelpogs.RegistrationServer_PogsClient{Client: conn.Bootstrap(ctx), Conn: conn},
transport: transport,
}
}
func (rsc *registrationServerClient) close() {
// Closing the client will also close the connection
rsc.client.Close()
// Closing the transport also closes the stream
rsc.transport.Close()
}
type rpcName string
const (
register rpcName = "register"
reconnect rpcName = "reconnect"
unregister rpcName = "unregister"
authenticate rpcName = " authenticate"
)
func registerConnection(
ctx context.Context,
rpcClient *registrationServerClient,
config *NamedTunnelConfig,
options *tunnelpogs.ConnectionOptions,
connIndex uint8,
observer *Observer,
) error {
conn, err := rpcClient.client.RegisterConnection(
ctx,
config.Auth,
config.ID,
connIndex,
options,
)
if err != nil {
if err.Error() == DuplicateConnectionError {
observer.metrics.regFail.WithLabelValues("dup_edge_conn", "registerConnection").Inc()
return errDuplicationConnection
}
observer.metrics.regFail.WithLabelValues("server_error", "registerConnection").Inc()
return serverRegistrationErrorFromRPC(err)
}
observer.metrics.regSuccess.WithLabelValues("registerConnection").Inc()
observer.logServerInfo(connIndex, conn.Location, fmt.Sprintf("Connection %d registered with %s using ID %s", connIndex, conn.Location, conn.UUID))
observer.sendConnectedEvent(connIndex, conn.Location)
return nil
}
func (h *h2muxConnection) registerTunnel(ctx context.Context, credentialSetter CredentialManager, classicTunnel *ClassicTunnelConfig, registrationOptions *tunnelpogs.RegistrationOptions) error {
h.observer.sendRegisteringEvent()
stream, err := h.newRPCStream(ctx, register)
if err != nil {
return err
}
rpcClient := NewTunnelServerClient(ctx, stream, h.observer)
defer rpcClient.Close()
h.logServerInfo(ctx, rpcClient)
registration := rpcClient.client.RegisterTunnel(
ctx,
classicTunnel.OriginCert,
classicTunnel.Hostname,
registrationOptions,
)
if registrationErr := registration.DeserializeError(); registrationErr != nil {
// RegisterTunnel RPC failure
return h.processRegisterTunnelError(registrationErr, register)
}
// Send free tunnel URL to UI
h.observer.sendURL(registration.Url)
credentialSetter.SetEventDigest(h.connIndex, registration.EventDigest)
return h.processRegistrationSuccess(registration, register, credentialSetter, classicTunnel)
}
type CredentialManager interface {
ReconnectToken() ([]byte, error)
EventDigest(connID uint8) ([]byte, error)
SetEventDigest(connID uint8, digest []byte)
ConnDigest(connID uint8) ([]byte, error)
SetConnDigest(connID uint8, digest []byte)
}
func (h *h2muxConnection) processRegistrationSuccess(
registration *tunnelpogs.TunnelRegistration,
name rpcName,
credentialManager CredentialManager, classicTunnel *ClassicTunnelConfig,
) error {
for _, logLine := range registration.LogLines {
h.observer.Info(logLine)
}
if registration.TunnelID != "" {
h.observer.metrics.tunnelsHA.AddTunnelID(h.connIndex, registration.TunnelID)
h.observer.Infof("Each HA connection's tunnel IDs: %v", h.observer.metrics.tunnelsHA.String())
}
// Print out the user's trial zone URL in a nice box (if they requested and got one and UI flag is not set)
if classicTunnel.IsTrialZone() {
err := h.observer.logTrialHostname(registration)
if err != nil {
return err
}
}
credentialManager.SetConnDigest(h.connIndex, registration.ConnDigest)
h.observer.metrics.userHostnamesCounts.WithLabelValues(registration.Url).Inc()
h.observer.Infof("Route propagating, it may take up to 1 minute for your new route to become functional")
h.observer.metrics.regSuccess.WithLabelValues(string(name)).Inc()
return nil
}
func (h *h2muxConnection) processRegisterTunnelError(err tunnelpogs.TunnelRegistrationError, name rpcName) error {
if err.Error() == DuplicateConnectionError {
h.observer.metrics.regFail.WithLabelValues("dup_edge_conn", string(name)).Inc()
return errDuplicationConnection
}
h.observer.metrics.regFail.WithLabelValues("server_error", string(name)).Inc()
return serverRegisterTunnelError{
cause: err,
permanent: err.IsPermanent(),
}
}
func (h *h2muxConnection) reconnectTunnel(ctx context.Context, credentialManager CredentialManager, classicTunnel *ClassicTunnelConfig, registrationOptions *tunnelpogs.RegistrationOptions) error {
token, err := credentialManager.ReconnectToken()
if err != nil {
return err
}
eventDigest, err := credentialManager.EventDigest(h.connIndex)
if err != nil {
return err
}
connDigest, err := credentialManager.ConnDigest(h.connIndex)
if err != nil {
return err
}
h.observer.Debug("initiating RPC stream to reconnect")
stream, err := h.newRPCStream(ctx, register)
if err != nil {
return err
}
rpcClient := NewTunnelServerClient(ctx, stream, h.observer)
defer rpcClient.Close()
h.logServerInfo(ctx, rpcClient)
registration := rpcClient.client.ReconnectTunnel(
ctx,
token,
eventDigest,
connDigest,
classicTunnel.Hostname,
registrationOptions,
)
if registrationErr := registration.DeserializeError(); registrationErr != nil {
// ReconnectTunnel RPC failure
return h.processRegisterTunnelError(registrationErr, reconnect)
}
return h.processRegistrationSuccess(registration, reconnect, credentialManager, classicTunnel)
}
func (h *h2muxConnection) logServerInfo(ctx context.Context, rpcClient *tunnelServerClient) error {
// Request server info without blocking tunnel registration; must use capnp library directly.
serverInfoPromise := tunnelrpc.TunnelServer{Client: rpcClient.client.Client}.GetServerInfo(ctx, func(tunnelrpc.TunnelServer_getServerInfo_Params) error {
return nil
})
serverInfoMessage, err := serverInfoPromise.Result().Struct()
if err != nil {
h.observer.Errorf("Failed to retrieve server information: %s", err)
return err
}
serverInfo, err := tunnelpogs.UnmarshalServerInfo(serverInfoMessage)
if err != nil {
h.observer.Errorf("Failed to retrieve server information: %s", err)
return err
}
h.observer.logServerInfo(h.connIndex, serverInfo.LocationName, fmt.Sprintf("Connnection %d connected to %s", h.connIndex, serverInfo.LocationName))
return nil
}
func (h *h2muxConnection) unregister(isNamedTunnel bool) {
unregisterCtx, cancel := context.WithTimeout(context.Background(), h.config.GracePeriod)
defer cancel()
stream, err := h.newRPCStream(unregisterCtx, register)
if err != nil {
return
}
if isNamedTunnel {
rpcClient := newRegistrationRPCClient(unregisterCtx, stream, h.observer)
defer rpcClient.close()
rpcClient.client.UnregisterConnection(unregisterCtx)
} else {
rpcClient := NewTunnelServerClient(unregisterCtx, stream, h.observer)
defer rpcClient.Close()
// gracePeriod is encoded in int64 using capnproto
rpcClient.client.UnregisterTunnel(unregisterCtx, h.config.GracePeriod.Nanoseconds())
}
}

View File

@@ -0,0 +1,50 @@
package connection
import (
"fmt"
"sync"
"github.com/prometheus/client_golang/prometheus"
)
// tunnelsForHA maps this cloudflared instance's HA connections to the tunnel IDs they serve.
type tunnelsForHA struct {
sync.Mutex
metrics *prometheus.GaugeVec
entries map[uint8]string
}
// NewTunnelsForHA initializes the Prometheus metrics etc for a tunnelsForHA.
func NewTunnelsForHA() tunnelsForHA {
metrics := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "tunnel_ids",
Help: "The ID of all tunnels (and their corresponding HA connection ID) running in this instance of cloudflared.",
},
[]string{"tunnel_id", "ha_conn_id"},
)
prometheus.MustRegister(metrics)
return tunnelsForHA{
metrics: metrics,
entries: make(map[uint8]string),
}
}
// Track a new tunnel ID, removing the disconnected tunnel (if any) and update metrics.
func (t *tunnelsForHA) AddTunnelID(haConn uint8, tunnelID string) {
t.Lock()
defer t.Unlock()
haStr := fmt.Sprintf("%v", haConn)
if oldTunnelID, ok := t.entries[haConn]; ok {
t.metrics.WithLabelValues(oldTunnelID, haStr).Dec()
}
t.entries[haConn] = tunnelID
t.metrics.WithLabelValues(tunnelID, haStr).Inc()
}
func (t *tunnelsForHA) String() string {
t.Lock()
defer t.Unlock()
return fmt.Sprintf("%v", t.entries)
}