cloudflared/connection/connection.go
João "Pisco" Fernandes 73a9980f38 TUN-9255: Improve flush on write conditions in http2 tunnel type to match what is done on the edge
## Summary
We have adapted our edge services to better know when they should flush on write. This is an important
feature to ensure response types like Server Side Events are not buffered, and instead are propagated to the eyeball
as soon as possible. This commit implements a similar logic for http2 tunnel protocol that we use in our edge
services. By adding the new events stream header for json `application/x-ndjson` and using the content-length
and transfer-encoding headers as well, following the RFC's:
- https://datatracker.ietf.org/doc/html/rfc7230#section-4.1
- https://datatracker.ietf.org/doc/html/rfc9112#section-6.1

Closes TUN-9255
2025-04-24 11:49:19 +00:00

321 lines
8.7 KiB
Go

package connection
import (
"context"
"encoding/base64"
"fmt"
"io"
"math"
"net"
"net/http"
"strconv"
"strings"
"time"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/cloudflare/cloudflared/tracing"
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
"github.com/cloudflare/cloudflared/websocket"
)
const (
lbProbeUserAgentPrefix = "Mozilla/5.0 (compatible; Cloudflare-Traffic-Manager/1.0; +https://www.cloudflare.com/traffic-manager/;"
LogFieldConnIndex = "connIndex"
MaxGracePeriod = time.Minute * 3
MaxConcurrentStreams = math.MaxUint32
contentTypeHeader = "content-type"
contentLengthHeader = "content-length"
transferEncodingHeader = "transfer-encoding"
sseContentType = "text/event-stream"
grpcContentType = "application/grpc"
sseJsonContentType = "application/x-ndjson"
chunkTransferEncoding = "chunked"
)
var (
switchingProtocolText = fmt.Sprintf("%d %s", http.StatusSwitchingProtocols, http.StatusText(http.StatusSwitchingProtocols))
flushableContentTypes = []string{sseContentType, grpcContentType, sseJsonContentType}
)
// TunnelConnection represents the connection to the edge.
// The Serve method is provided to allow clients to handle any errors from the connection encountered during
// processing of the connection. Cancelling of the context provided to Serve will close the connection.
type TunnelConnection interface {
Serve(ctx context.Context) error
}
type Orchestrator interface {
UpdateConfig(version int32, config []byte) *pogs.UpdateConfigurationResponse
GetConfigJSON() ([]byte, error)
GetOriginProxy() (OriginProxy, error)
}
type TunnelProperties struct {
Credentials Credentials
Client pogs.ClientInfo
QuickTunnelUrl string
}
// Credentials are stored in the credentials file and contain all info needed to run a tunnel.
type Credentials struct {
AccountTag string
TunnelSecret []byte
TunnelID uuid.UUID
Endpoint string
}
func (c *Credentials) Auth() pogs.TunnelAuth {
return pogs.TunnelAuth{
AccountTag: c.AccountTag,
TunnelSecret: c.TunnelSecret,
}
}
// TunnelToken are Credentials but encoded with custom fields namings.
type TunnelToken struct {
AccountTag string `json:"a"`
TunnelSecret []byte `json:"s"`
TunnelID uuid.UUID `json:"t"`
Endpoint string `json:"e,omitempty"`
}
func (t TunnelToken) Credentials() Credentials {
// nolint: gosimple
return Credentials{
AccountTag: t.AccountTag,
TunnelSecret: t.TunnelSecret,
TunnelID: t.TunnelID,
Endpoint: t.Endpoint,
}
}
func (t TunnelToken) Encode() (string, error) {
val, err := json.Marshal(t)
if err != nil {
return "", errors.Wrap(err, "could not JSON encode token")
}
return base64.StdEncoding.EncodeToString(val), nil
}
type ClassicTunnelProperties struct {
Hostname string
OriginCert []byte
// feature-flag to use new edge reconnect tokens
UseReconnectToken bool
}
// Type indicates the connection type of the connection.
type Type int
const (
TypeWebsocket Type = iota
TypeTCP
TypeControlStream
TypeHTTP
TypeConfiguration
)
// ShouldFlush returns whether this kind of connection should actively flush data
func (t Type) shouldFlush() bool {
switch t {
case TypeWebsocket, TypeTCP, TypeControlStream:
return true
default:
return false
}
}
func (t Type) String() string {
switch t {
case TypeWebsocket:
return "websocket"
case TypeTCP:
return "tcp"
case TypeControlStream:
return "control stream"
case TypeHTTP:
return "http"
default:
return fmt.Sprintf("Unknown Type %d", t)
}
}
// OriginProxy is how data flows from cloudflared to the origin services running behind it.
type OriginProxy interface {
ProxyHTTP(w ResponseWriter, tr *tracing.TracedHTTPRequest, isWebsocket bool) error
ProxyTCP(ctx context.Context, rwa ReadWriteAcker, req *TCPRequest) error
}
// TCPRequest defines the input format needed to perform a TCP proxy.
type TCPRequest struct {
Dest string
CFRay string
LBProbe bool
FlowID string
CfTraceID string
ConnIndex uint8
}
// ReadWriteAcker is a readwriter with the ability to Acknowledge to the downstream (edge) that the origin has
// accepted the connection.
type ReadWriteAcker interface {
io.ReadWriter
AckConnection(tracePropagation string) error
}
// HTTPResponseReadWriteAcker is an HTTP implementation of ReadWriteAcker.
type HTTPResponseReadWriteAcker struct {
r io.Reader
w ResponseWriter
f http.Flusher
req *http.Request
}
// NewHTTPResponseReadWriterAcker returns a new instance of HTTPResponseReadWriteAcker.
func NewHTTPResponseReadWriterAcker(w ResponseWriter, flusher http.Flusher, req *http.Request) *HTTPResponseReadWriteAcker {
return &HTTPResponseReadWriteAcker{
r: req.Body,
w: w,
f: flusher,
req: req,
}
}
func (h *HTTPResponseReadWriteAcker) Read(p []byte) (int, error) {
return h.r.Read(p)
}
func (h *HTTPResponseReadWriteAcker) Write(p []byte) (int, error) {
n, err := h.w.Write(p)
if n > 0 {
h.f.Flush()
}
return n, err
}
// AckConnection acks an HTTP connection by sending a switch protocols status code that enables the caller to
// upgrade to streams.
func (h *HTTPResponseReadWriteAcker) AckConnection(tracePropagation string) error {
resp := &http.Response{
Status: switchingProtocolText,
StatusCode: http.StatusSwitchingProtocols,
ContentLength: -1,
Header: http.Header{},
}
if secWebsocketKey := h.req.Header.Get("Sec-WebSocket-Key"); secWebsocketKey != "" {
resp.Header = websocket.NewResponseHeader(h.req)
}
if tracePropagation != "" {
resp.Header.Add(tracing.CanonicalCloudflaredTracingHeader, tracePropagation)
}
return h.w.WriteRespHeaders(resp.StatusCode, resp.Header)
}
// localProxyConnection emulates an incoming connection to cloudflared as a net.Conn.
// Used when handling a "hijacked" connection from connection.ResponseWriter
type localProxyConnection struct {
io.ReadWriteCloser
}
func (c *localProxyConnection) Read(b []byte) (int, error) {
return c.ReadWriteCloser.Read(b)
}
func (c *localProxyConnection) Write(b []byte) (int, error) {
return c.ReadWriteCloser.Write(b)
}
func (c *localProxyConnection) Close() error {
return c.ReadWriteCloser.Close()
}
func (c *localProxyConnection) LocalAddr() net.Addr {
// Unused LocalAddr
return &net.TCPAddr{IP: net.IPv6loopback, Port: 0, Zone: ""}
}
func (c *localProxyConnection) RemoteAddr() net.Addr {
// Unused RemoteAddr
return &net.TCPAddr{IP: net.IPv6loopback, Port: 0, Zone: ""}
}
func (c *localProxyConnection) SetDeadline(t time.Time) error {
// ignored since we can't set the read/write Deadlines for the tunnel back to origintunneld
return nil
}
func (c *localProxyConnection) SetReadDeadline(t time.Time) error {
// ignored since we can't set the read/write Deadlines for the tunnel back to origintunneld
return nil
}
func (c *localProxyConnection) SetWriteDeadline(t time.Time) error {
// ignored since we can't set the read/write Deadlines for the tunnel back to origintunneld
return nil
}
// ResponseWriter is the response path for a request back through cloudflared's tunnel.
type ResponseWriter interface {
WriteRespHeaders(status int, header http.Header) error
AddTrailer(trailerName, trailerValue string)
http.ResponseWriter
http.Hijacker
io.Writer
}
type ConnectedFuse interface {
Connected()
IsConnected() bool
}
// Helper method to let the caller know what content-types should require a flush on every
// write to a ResponseWriter.
func shouldFlush(headers http.Header) bool {
// When doing Server Side Events (SSE), some frameworks don't respect the `Content-Type` header.
// Therefore, we need to rely on other ways to know whether we should flush on write or not. A good
// approach is to assume that responses without `Content-Length` or with `Transfer-Encoding: chunked`
// are streams, and therefore, should be flushed right away to the eyeball.
// References:
// - https://datatracker.ietf.org/doc/html/rfc7230#section-4.1
// - https://datatracker.ietf.org/doc/html/rfc9112#section-6.1
if contentLength := headers.Get(contentLengthHeader); contentLength == "" {
return true
}
if transferEncoding := headers.Get(transferEncodingHeader); transferEncoding != "" {
transferEncoding = strings.ToLower(transferEncoding)
if strings.Contains(transferEncoding, chunkTransferEncoding) {
return true
}
}
if contentType := headers.Get(contentTypeHeader); contentType != "" {
contentType = strings.ToLower(contentType)
for _, c := range flushableContentTypes {
if strings.HasPrefix(contentType, c) {
return true
}
}
}
return false
}
func uint8ToString(input uint8) string {
return strconv.FormatUint(uint64(input), 10)
}
func FindCfRayHeader(req *http.Request) string {
return req.Header.Get("Cf-Ray")
}
func IsLBProbeRequest(req *http.Request) bool {
return strings.HasPrefix(req.UserAgent(), lbProbeUserAgentPrefix)
}