mirror of
https://github.com/cloudflare/cloudflared.git
synced 2025-05-11 21:56:34 +00:00

cloudflared falls back aggressively to HTTP/2 protocol if a connection attempt with QUIC failed. This was done to ensure that machines with UDP egress disabled did not stop clients from connecting to the cloudlfare edge. This PR improves on that experience by having cloudflared remember if a QUIC connection was successful which implies UDP egress works. In this case, cloudflared does not fallback to HTTP/2 and keeps trying to connect to the edge with QUIC.
113 lines
2.8 KiB
Go
113 lines
2.8 KiB
Go
package connection
|
|
|
|
import (
|
|
"net"
|
|
"strings"
|
|
|
|
"github.com/rs/zerolog"
|
|
)
|
|
|
|
const (
|
|
LogFieldLocation = "location"
|
|
LogFieldIPAddress = "ip"
|
|
observerChannelBufferSize = 16
|
|
)
|
|
|
|
type Observer struct {
|
|
log *zerolog.Logger
|
|
logTransport *zerolog.Logger
|
|
metrics *tunnelMetrics
|
|
tunnelEventChan chan Event
|
|
addSinkChan chan EventSink
|
|
}
|
|
|
|
type EventSink interface {
|
|
OnTunnelEvent(event Event)
|
|
}
|
|
|
|
func NewObserver(log, logTransport *zerolog.Logger) *Observer {
|
|
o := &Observer{
|
|
log: log,
|
|
logTransport: logTransport,
|
|
metrics: newTunnelMetrics(),
|
|
tunnelEventChan: make(chan Event, observerChannelBufferSize),
|
|
addSinkChan: make(chan EventSink, observerChannelBufferSize),
|
|
}
|
|
go o.dispatchEvents()
|
|
return o
|
|
}
|
|
|
|
func (o *Observer) RegisterSink(sink EventSink) {
|
|
o.addSinkChan <- sink
|
|
}
|
|
|
|
func (o *Observer) logServerInfo(connIndex uint8, location string, address net.IP, msg string) {
|
|
o.sendEvent(Event{Index: connIndex, EventType: Connected, Location: location})
|
|
o.log.Info().
|
|
Uint8(LogFieldConnIndex, connIndex).
|
|
Str(LogFieldLocation, location).
|
|
IPAddr(LogFieldIPAddress, address).
|
|
Msg(msg)
|
|
o.metrics.registerServerLocation(uint8ToString(connIndex), location)
|
|
}
|
|
|
|
func (o *Observer) sendRegisteringEvent(connIndex uint8) {
|
|
o.sendEvent(Event{Index: connIndex, EventType: RegisteringTunnel})
|
|
}
|
|
|
|
func (o *Observer) sendConnectedEvent(connIndex uint8, protocol Protocol, location string) {
|
|
o.sendEvent(Event{Index: connIndex, EventType: Connected, Protocol: protocol, Location: location})
|
|
}
|
|
|
|
func (o *Observer) SendURL(url string) {
|
|
o.sendEvent(Event{EventType: SetURL, URL: url})
|
|
|
|
if !strings.HasPrefix(url, "https://") {
|
|
// We add https:// in the prefix for backwards compatibility as we used to do that with the old free tunnels
|
|
// and some tools (like `wrangler tail`) are regexp-ing for that specifically.
|
|
url = "https://" + url
|
|
}
|
|
o.metrics.userHostnamesCounts.WithLabelValues(url).Inc()
|
|
}
|
|
|
|
func (o *Observer) SendReconnect(connIndex uint8) {
|
|
o.sendEvent(Event{Index: connIndex, EventType: Reconnecting})
|
|
}
|
|
|
|
func (o *Observer) sendUnregisteringEvent(connIndex uint8) {
|
|
o.sendEvent(Event{Index: connIndex, EventType: Unregistering})
|
|
}
|
|
|
|
func (o *Observer) SendDisconnect(connIndex uint8) {
|
|
o.sendEvent(Event{Index: connIndex, EventType: Disconnected})
|
|
}
|
|
|
|
func (o *Observer) sendEvent(e Event) {
|
|
select {
|
|
case o.tunnelEventChan <- e:
|
|
break
|
|
default:
|
|
o.log.Warn().Msg("observer channel buffer is full")
|
|
}
|
|
}
|
|
|
|
func (o *Observer) dispatchEvents() {
|
|
var sinks []EventSink
|
|
for {
|
|
select {
|
|
case sink := <-o.addSinkChan:
|
|
sinks = append(sinks, sink)
|
|
case evt := <-o.tunnelEventChan:
|
|
for _, sink := range sinks {
|
|
sink.OnTunnelEvent(evt)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
type EventSinkFunc func(event Event)
|
|
|
|
func (f EventSinkFunc) OnTunnelEvent(event Event) {
|
|
f(event)
|
|
}
|