package origin import ( "context" "crypto/tls" "fmt" "net" "net/http" "net/url" "strconv" "strings" "sync" "time" "github.com/google/uuid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "golang.org/x/sync/errgroup" "github.com/cloudflare/cloudflared/buffer" "github.com/cloudflare/cloudflared/cmd/cloudflared/buildinfo" "github.com/cloudflare/cloudflared/cmd/cloudflared/ui" "github.com/cloudflare/cloudflared/connection" "github.com/cloudflare/cloudflared/h2mux" "github.com/cloudflare/cloudflared/ingress" "github.com/cloudflare/cloudflared/logger" "github.com/cloudflare/cloudflared/signal" "github.com/cloudflare/cloudflared/tunnelrpc" "github.com/cloudflare/cloudflared/tunnelrpc/pogs" tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs" "github.com/cloudflare/cloudflared/websocket" ) const ( dialTimeout = 15 * time.Second openStreamTimeout = 30 * time.Second muxerTimeout = 5 * time.Second lbProbeUserAgentPrefix = "Mozilla/5.0 (compatible; Cloudflare-Traffic-Manager/1.0; +https://www.cloudflare.com/traffic-manager/;" TagHeaderNamePrefix = "Cf-Warp-Tag-" DuplicateConnectionError = "EDUPCONN" FeatureSerializedHeaders = "serialized_headers" FeatureQuickReconnects = "quick_reconnects" ) type rpcName string const ( register rpcName = "register" reconnect rpcName = "reconnect" unregister rpcName = "unregister" authenticate rpcName = " authenticate" ) type TunnelConfig struct { BuildInfo *buildinfo.BuildInfo ClientID string CloseConnOnce *sync.Once // Used to close connectedSignal no more than once CompressionQuality uint64 EdgeAddrs []string GracePeriod time.Duration HAConnections int HeartbeatInterval time.Duration Hostname string IncidentLookup IncidentLookup IsAutoupdated bool IsFreeTunnel bool LBPool string Logger logger.Service TransportLogger logger.Service MaxHeartbeats uint64 Metrics *TunnelMetrics MetricsUpdateFreq time.Duration OriginCert []byte ReportedVersion string Retries uint RunFromTerminal bool Tags []tunnelpogs.Tag TlsConfig *tls.Config WSGI bool // feature-flag to use new edge reconnect tokens UseReconnectToken bool NamedTunnel *NamedTunnelConfig ReplaceExisting bool TunnelEventChan chan<- ui.TunnelEvent IngressRules ingress.Ingress } type dupConnRegisterTunnelError struct{} var errDuplicationConnection = &dupConnRegisterTunnelError{} func (e dupConnRegisterTunnelError) Error() string { return "already connected to this server, trying another address" } type muxerShutdownError struct{} func (e muxerShutdownError) Error() string { return "muxer shutdown" } // RegisterTunnel error from server type serverRegisterTunnelError struct { cause error permanent bool } func (e serverRegisterTunnelError) Error() string { return e.cause.Error() } // RegisterTunnel error from client type clientRegisterTunnelError struct { cause error } func newRPCError(cause error, counter *prometheus.CounterVec, name rpcName) clientRegisterTunnelError { counter.WithLabelValues(cause.Error(), string(name)).Inc() return clientRegisterTunnelError{cause: cause} } func (e clientRegisterTunnelError) Error() string { return e.cause.Error() } func (c *TunnelConfig) muxerConfig(handler h2mux.MuxedStreamHandler) h2mux.MuxerConfig { return h2mux.MuxerConfig{ Timeout: muxerTimeout, Handler: handler, IsClient: true, HeartbeatInterval: c.HeartbeatInterval, MaxHeartbeats: c.MaxHeartbeats, Logger: c.TransportLogger, CompressionQuality: h2mux.CompressionSetting(c.CompressionQuality), } } func (c *TunnelConfig) RegistrationOptions(connectionID uint8, OriginLocalIP string, uuid uuid.UUID) *tunnelpogs.RegistrationOptions { policy := tunnelrpc.ExistingTunnelPolicy_balance if c.HAConnections <= 1 && c.LBPool == "" { policy = tunnelrpc.ExistingTunnelPolicy_disconnect } return &tunnelpogs.RegistrationOptions{ ClientID: c.ClientID, Version: c.ReportedVersion, OS: fmt.Sprintf("%s_%s", c.BuildInfo.GoOS, c.BuildInfo.GoArch), ExistingTunnelPolicy: policy, PoolName: c.LBPool, Tags: c.Tags, ConnectionID: connectionID, OriginLocalIP: OriginLocalIP, IsAutoupdated: c.IsAutoupdated, RunFromTerminal: c.RunFromTerminal, CompressionQuality: c.CompressionQuality, UUID: uuid.String(), Features: c.SupportedFeatures(), } } func (c *TunnelConfig) ConnectionOptions(originLocalAddr string, numPreviousAttempts uint8) *tunnelpogs.ConnectionOptions { // attempt to parse out origin IP, but don't fail since it's informational field host, _, _ := net.SplitHostPort(originLocalAddr) originIP := net.ParseIP(host) return &tunnelpogs.ConnectionOptions{ Client: c.NamedTunnel.Client, OriginLocalIP: originIP, ReplaceExisting: c.ReplaceExisting, CompressionQuality: uint8(c.CompressionQuality), NumPreviousAttempts: numPreviousAttempts, } } func (c *TunnelConfig) SupportedFeatures() []string { features := []string{FeatureSerializedHeaders} if c.NamedTunnel == nil { features = append(features, FeatureQuickReconnects) } return features } func (c *TunnelConfig) IsTrialTunnel() bool { return c.Hostname == "" } type NamedTunnelConfig struct { Auth pogs.TunnelAuth ID uuid.UUID Client pogs.ClientInfo Protocol Protocol } type Protocol int64 const ( h2muxProtocol Protocol = iota http2Protocol ) func ParseProtocol(s string) (Protocol, bool) { switch s { case "h2mux": return h2muxProtocol, true case "http2": return http2Protocol, true default: return 0, false } } func StartTunnelDaemon(ctx context.Context, config *TunnelConfig, connectedSignal *signal.Signal, cloudflaredID uuid.UUID, reconnectCh chan ReconnectSignal) error { s, err := NewSupervisor(config, cloudflaredID) if err != nil { return err } return s.Run(ctx, connectedSignal, reconnectCh) } func ServeTunnelLoop(ctx context.Context, credentialManager *reconnectCredentialManager, config *TunnelConfig, addr *net.TCPAddr, connectionIndex uint8, connectedSignal *signal.Signal, cloudflaredUUID uuid.UUID, bufferPool *buffer.Pool, reconnectCh chan ReconnectSignal, ) error { config.Metrics.incrementHaConnections() defer config.Metrics.decrementHaConnections() backoff := BackoffHandler{MaxRetries: config.Retries} connectedFuse := h2mux.NewBooleanFuse() go func() { if connectedFuse.Await() { connectedSignal.Notify() } }() // Ensure the above goroutine will terminate if we return without connecting defer connectedFuse.Fuse(false) for { err, recoverable := ServeTunnel( ctx, credentialManager, config, config.Logger, addr, connectionIndex, connectedFuse, &backoff, cloudflaredUUID, bufferPool, reconnectCh, ) if recoverable { if duration, ok := backoff.GetBackoffDuration(ctx); ok { if config.TunnelEventChan != nil { config.TunnelEventChan <- ui.TunnelEvent{Index: connectionIndex, EventType: ui.Reconnecting} } config.Logger.Infof("Retrying connection %d in %s seconds", connectionIndex, duration) backoff.Backoff(ctx) continue } } return err } } func ServeTunnel( ctx context.Context, credentialManager *reconnectCredentialManager, config *TunnelConfig, logger logger.Service, addr *net.TCPAddr, connectionIndex uint8, connectedFuse *h2mux.BooleanFuse, backoff *BackoffHandler, cloudflaredUUID uuid.UUID, bufferPool *buffer.Pool, reconnectCh chan ReconnectSignal, ) (err error, recoverable bool) { // Treat panics as recoverable errors defer func() { if r := recover(); r != nil { var ok bool err, ok = r.(error) if !ok { err = fmt.Errorf("ServeTunnel: %v", r) } recoverable = true } }() // If launch-ui flag is set, send disconnect msg if config.TunnelEventChan != nil { defer func() { config.TunnelEventChan <- ui.TunnelEvent{Index: connectionIndex, EventType: ui.Disconnected} }() } connectionTag := uint8ToString(connectionIndex) if config.NamedTunnel != nil && config.NamedTunnel.Protocol == http2Protocol { return ServeNamedTunnel(ctx, config, connectionIndex, addr, connectedFuse, reconnectCh) } // Returns error from parsing the origin URL or handshake errors handler, originLocalAddr, err := NewTunnelHandler(ctx, config, addr, connectionIndex, bufferPool) if err != nil { switch err.(type) { case connection.DialError: logger.Errorf("Connection %d unable to dial edge: %s", connectionIndex, err) case h2mux.MuxerHandshakeError: logger.Errorf("Connection %d handshake with edge server failed: %s", connectionIndex, err) default: logger.Errorf("Connection %d failed: %s", connectionIndex, err) return err, false } return err, true } errGroup, serveCtx := errgroup.WithContext(ctx) errGroup.Go(func() (err error) { defer func() { if err == nil { connectedFuse.Fuse(true) backoff.SetGracePeriod() } }() if config.UseReconnectToken && connectedFuse.Value() { err := ReconnectTunnel(serveCtx, handler.muxer, config, logger, connectionIndex, originLocalAddr, cloudflaredUUID, credentialManager) if err == nil { return nil } // log errors and proceed to RegisterTunnel logger.Errorf("Couldn't reconnect connection %d. Reregistering it instead. Error was: %v", connectionIndex, err) } return RegisterTunnel(serveCtx, credentialManager, handler.muxer, config, logger, connectionIndex, originLocalAddr, cloudflaredUUID) }) errGroup.Go(func() error { updateMetricsTickC := time.Tick(config.MetricsUpdateFreq) for { select { case <-serveCtx.Done(): // UnregisterTunnel blocks until the RPC call returns if connectedFuse.Value() { if config.NamedTunnel != nil { _ = UnregisterConnection(ctx, handler.muxer, config) } else { _ = UnregisterTunnel(handler.muxer, config) } } handler.muxer.Shutdown() return nil case <-updateMetricsTickC: handler.UpdateMetrics(connectionTag) } } }) errGroup.Go(func() error { for { select { case reconnect := <-reconnectCh: return &reconnect case <-serveCtx.Done(): return nil } } }) errGroup.Go(func() error { // All routines should stop when muxer finish serving. When muxer is shutdown // gracefully, it doesn't return an error, so we need to return errMuxerShutdown // here to notify other routines to stop err := handler.muxer.Serve(serveCtx) if err == nil { return muxerShutdownError{} } return err }) err = errGroup.Wait() if err != nil { switch err := err.(type) { case *dupConnRegisterTunnelError: // don't retry this connection anymore, let supervisor pick new a address return err, false case *serverRegisterTunnelError: logger.Errorf("Register tunnel error from server side: %s", err.cause) // Don't send registration error return from server to Sentry. They are // logged on server side if incidents := config.IncidentLookup.ActiveIncidents(); len(incidents) > 0 { logger.Error(activeIncidentsMsg(incidents)) } return err.cause, !err.permanent case *clientRegisterTunnelError: logger.Errorf("Register tunnel error on client side: %s", err.cause) return err, true case *muxerShutdownError: logger.Info("Muxer shutdown") return err, true case *ReconnectSignal: logger.Infof("Restarting connection %d due to reconnect signal in %d seconds", connectionIndex, err.Delay) err.DelayBeforeReconnect() return err, true default: if err == context.Canceled { logger.Debugf("Serve tunnel error: %s", err) return err, false } logger.Errorf("Serve tunnel error: %s", err) return err, true } } return nil, true } func RegisterConnectionWithH2Mux( ctx context.Context, muxer *h2mux.Muxer, config *TunnelConfig, connectionIndex uint8, originLocalAddr string, numPreviousAttempts uint8, ) error { const registerConnection = "registerConnection" config.TransportLogger.Debug("initiating RPC stream for RegisterConnection") rpcClient, err := newTunnelRPCClient(ctx, muxer, config, registerConnection) if err != nil { return err } defer rpcClient.Close() conn, err := rpcClient.RegisterConnection( ctx, config.NamedTunnel.Auth, config.NamedTunnel.ID, connectionIndex, config.ConnectionOptions(originLocalAddr, numPreviousAttempts), ) if err != nil { if err.Error() == DuplicateConnectionError { config.Metrics.regFail.WithLabelValues("dup_edge_conn", registerConnection).Inc() return errDuplicationConnection } config.Metrics.regFail.WithLabelValues("server_error", registerConnection).Inc() return serverRegistrationErrorFromRPC(err) } config.Metrics.regSuccess.WithLabelValues(registerConnection).Inc() config.Logger.Infof("Connection %d registered with %s using ID %s", connectionIndex, conn.Location, conn.UUID) // If launch-ui flag is set, send connect msg if config.TunnelEventChan != nil { config.TunnelEventChan <- ui.TunnelEvent{Index: connectionIndex, EventType: ui.Connected, Location: conn.Location} } return nil } func ServeNamedTunnel( ctx context.Context, config *TunnelConfig, connIndex uint8, addr *net.TCPAddr, connectedFuse *h2mux.BooleanFuse, reconnectCh chan ReconnectSignal, ) (err error, recoverable bool) { tlsServerConn, err := connection.DialEdge(ctx, dialTimeout, config.TlsConfig, addr) if err != nil { return err, true } cfdServer, err := newHTTP2Server(config, connIndex, tlsServerConn.LocalAddr(), connectedFuse) if err != nil { return err, false } errGroup, serveCtx := errgroup.WithContext(ctx) errGroup.Go(func() error { cfdServer.serve(serveCtx, tlsServerConn) return fmt.Errorf("Connection with edge closed") }) errGroup.Go(func() error { select { case reconnect := <-reconnectCh: return &reconnect case <-serveCtx.Done(): return nil } }) err = errGroup.Wait() if err != nil { return err, true } return nil, false } func serverRegistrationErrorFromRPC(err error) *serverRegisterTunnelError { if retryable, ok := err.(*tunnelpogs.RetryableError); ok { return &serverRegisterTunnelError{ cause: retryable.Unwrap(), permanent: false, } } return &serverRegisterTunnelError{ cause: err, permanent: true, } } func UnregisterConnection( ctx context.Context, muxer *h2mux.Muxer, config *TunnelConfig, ) error { config.TransportLogger.Debug("initiating RPC stream for UnregisterConnection") rpcClient, err := newTunnelRPCClient(ctx, muxer, config, register) if err != nil { // RPC stream open error return err } defer rpcClient.Close() return rpcClient.UnregisterConnection(ctx) } func RegisterTunnel( ctx context.Context, credentialManager *reconnectCredentialManager, muxer *h2mux.Muxer, config *TunnelConfig, logger logger.Service, connectionID uint8, originLocalIP string, uuid uuid.UUID, ) error { config.TransportLogger.Debug("initiating RPC stream to register") if config.TunnelEventChan != nil { config.TunnelEventChan <- ui.TunnelEvent{EventType: ui.RegisteringTunnel} } rpcClient, err := newTunnelRPCClient(ctx, muxer, config, register) if err != nil { return err } defer rpcClient.Close() // Request server info without blocking tunnel registration; must use capnp library directly. serverInfoPromise := tunnelrpc.TunnelServer{Client: rpcClient.Client}.GetServerInfo(ctx, func(tunnelrpc.TunnelServer_getServerInfo_Params) error { return nil }) LogServerInfo(serverInfoPromise.Result(), connectionID, config.Metrics, logger, config.TunnelEventChan) registration := rpcClient.RegisterTunnel( ctx, config.OriginCert, config.Hostname, config.RegistrationOptions(connectionID, originLocalIP, uuid), ) if registrationErr := registration.DeserializeError(); registrationErr != nil { // RegisterTunnel RPC failure return processRegisterTunnelError(registrationErr, config.Metrics, register) } // Send free tunnel URL to UI if config.TunnelEventChan != nil { config.TunnelEventChan <- ui.TunnelEvent{EventType: ui.SetUrl, Url: registration.Url} } credentialManager.SetEventDigest(connectionID, registration.EventDigest) return processRegistrationSuccess(config, logger, connectionID, registration, register, credentialManager) } func processRegistrationSuccess( config *TunnelConfig, logger logger.Service, connectionID uint8, registration *tunnelpogs.TunnelRegistration, name rpcName, credentialManager *reconnectCredentialManager, ) error { for _, logLine := range registration.LogLines { logger.Info(logLine) } if registration.TunnelID != "" { config.Metrics.tunnelsHA.AddTunnelID(connectionID, registration.TunnelID) logger.Infof("Each HA connection's tunnel IDs: %v", config.Metrics.tunnelsHA.String()) } // Print out the user's trial zone URL in a nice box (if they requested and got one and UI flag is not set) if config.TunnelEventChan == nil { if config.IsTrialTunnel() { if registrationURL, err := url.Parse(registration.Url); err == nil { for _, line := range asciiBox(trialZoneMsg(registrationURL.String()), 2) { logger.Info(line) } } else { logger.Error("Failed to connect tunnel, please try again.") return fmt.Errorf("empty URL in response from Cloudflare edge") } } } credentialManager.SetConnDigest(connectionID, registration.ConnDigest) config.Metrics.userHostnamesCounts.WithLabelValues(registration.Url).Inc() logger.Infof("Route propagating, it may take up to 1 minute for your new route to become functional") config.Metrics.regSuccess.WithLabelValues(string(name)).Inc() return nil } func processRegisterTunnelError(err tunnelpogs.TunnelRegistrationError, metrics *TunnelMetrics, name rpcName) error { if err.Error() == DuplicateConnectionError { metrics.regFail.WithLabelValues("dup_edge_conn", string(name)).Inc() return errDuplicationConnection } metrics.regFail.WithLabelValues("server_error", string(name)).Inc() return serverRegisterTunnelError{ cause: err, permanent: err.IsPermanent(), } } func UnregisterTunnel(muxer *h2mux.Muxer, config *TunnelConfig) error { config.TransportLogger.Debug("initiating RPC stream to unregister") ctx := context.Background() rpcClient, err := newTunnelRPCClient(ctx, muxer, config, unregister) if err != nil { // RPC stream open error return err } defer rpcClient.Close() // gracePeriod is encoded in int64 using capnproto return rpcClient.UnregisterTunnel(ctx, config.GracePeriod.Nanoseconds()) } func LogServerInfo( promise tunnelrpc.ServerInfo_Promise, connectionID uint8, metrics *TunnelMetrics, logger logger.Service, tunnelEventChan chan<- ui.TunnelEvent, ) { serverInfoMessage, err := promise.Struct() if err != nil { logger.Errorf("Failed to retrieve server information: %s", err) return } serverInfo, err := tunnelpogs.UnmarshalServerInfo(serverInfoMessage) if err != nil { logger.Errorf("Failed to retrieve server information: %s", err) return } // If launch-ui flag is set, send connect msg if tunnelEventChan != nil { tunnelEventChan <- ui.TunnelEvent{Index: connectionID, EventType: ui.Connected, Location: serverInfo.LocationName} } logger.Infof("Connected to %s", serverInfo.LocationName) metrics.registerServerLocation(uint8ToString(connectionID), serverInfo.LocationName) } func serveWebsocket(wsResp WebsocketResp, req *http.Request, rule *ingress.Rule) (*http.Response, error) { if hostHeader := rule.Config.HTTPHostHeader; hostHeader != "" { req.Header.Set("Host", hostHeader) req.Host = hostHeader } dialler, ok := rule.Service.(websocket.Dialler) if !ok { return nil, fmt.Errorf("Websockets aren't supported by the origin service '%s'", rule.Service) } conn, response, err := websocket.ClientConnect(req, dialler) if err != nil { return nil, err } defer conn.Close() err = wsResp.WriteRespHeaders(response) if err != nil { return nil, errors.Wrap(err, "Error writing response header") } // Copy to/from stream to the undelying connection. Use the underlying // connection because cloudflared doesn't operate on the message themselves websocket.Stream(conn.UnderlyingConn(), wsResp) return response, nil } func uint8ToString(input uint8) string { return strconv.FormatUint(uint64(input), 10) } // Print out the given lines in a nice ASCII box. func asciiBox(lines []string, padding int) (box []string) { maxLen := maxLen(lines) spacer := strings.Repeat(" ", padding) border := "+" + strings.Repeat("-", maxLen+(padding*2)) + "+" box = append(box, border) for _, line := range lines { box = append(box, "|"+spacer+line+strings.Repeat(" ", maxLen-len(line))+spacer+"|") } box = append(box, border) return } func maxLen(lines []string) int { max := 0 for _, line := range lines { if len(line) > max { max = len(line) } } return max } func trialZoneMsg(url string) []string { return []string{ "Your free tunnel has started! Visit it:", " " + url, } } func activeIncidentsMsg(incidents []Incident) string { preamble := "There is an active Cloudflare incident that may be related:" if len(incidents) > 1 { preamble = "There are active Cloudflare incidents that may be related:" } incidentStrings := []string{} for _, incident := range incidents { incidentString := fmt.Sprintf("%s (%s)", incident.Name, incident.URL()) incidentStrings = append(incidentStrings, incidentString) } return preamble + " " + strings.Join(incidentStrings, "; ") } func findCfRayHeader(h1 *http.Request) string { return h1.Header.Get("Cf-Ray") } func isLBProbeRequest(req *http.Request) bool { return strings.HasPrefix(req.UserAgent(), lbProbeUserAgentPrefix) } func newTunnelRPCClient(ctx context.Context, muxer *h2mux.Muxer, config *TunnelConfig, rpcName rpcName) (tunnelpogs.TunnelServer_PogsClient, error) { openStreamCtx, openStreamCancel := context.WithTimeout(ctx, openStreamTimeout) defer openStreamCancel() stream, err := muxer.OpenRPCStream(openStreamCtx) if err != nil { return tunnelpogs.TunnelServer_PogsClient{}, err } rpcClient, err := connection.NewTunnelRPCClient(ctx, stream, config.TransportLogger) if err != nil { // RPC stream open error return tunnelpogs.TunnelServer_PogsClient{}, newRPCError(err, config.Metrics.rpcFail, rpcName) } return rpcClient, nil }