From b4a98b13fe9141ee30702e15f076c3a4a345c497 Mon Sep 17 00:00:00 2001 From: Devin Carr Date: Mon, 23 Jun 2025 18:01:15 +0000 Subject: [PATCH] TUN-9469: Centralize UDP origin proxy dialing as ingress service Introduces a new `UDPOriginProxy` interface and `UDPOriginService` to standardize how UDP connections are dialed to origins. Allows for future overrides of the ingress service for specific dial destinations. Simplifies dependency injection for UDP dialing throughout both datagram v2 and v3 by using the same ingress service. Previous invocations called into a DialUDP function in the ingress package that was a light wrapper over `net.DialUDP`. Now a reference is passed into both datagram controllers that allows more control over the DialUDP method. Closes TUN-9469 --- connection/quic_connection_test.go | 1 + connection/quic_datagram_v2.go | 34 ++++++++++++++-- connection/quic_datagram_v2_test.go | 2 + ingress/origin_udp_proxy.go | 60 +++++++++++++++++++---------- quic/v3/manager.go | 11 ++---- quic/v3/manager_test.go | 6 +-- quic/v3/muxer_test.go | 12 +++--- supervisor/supervisor.go | 5 ++- supervisor/tunnel.go | 2 + 9 files changed, 93 insertions(+), 40 deletions(-) diff --git a/connection/quic_connection_test.go b/connection/quic_connection_test.go index 251f8630..6e85fcd7 100644 --- a/connection/quic_connection_test.go +++ b/connection/quic_connection_test.go @@ -830,6 +830,7 @@ func testTunnelConnection(t *testing.T, serverAddr netip.AddrPort, index uint8) sessionManager, cfdflow.NewLimiter(0), datagramMuxer, + ingress.DefaultUDPDialer, packetRouter, 15 * time.Second, 0 * time.Second, diff --git a/connection/quic_datagram_v2.go b/connection/quic_datagram_v2.go index 01e13466..94252551 100644 --- a/connection/quic_datagram_v2.go +++ b/connection/quic_datagram_v2.go @@ -4,9 +4,11 @@ import ( "context" "fmt" "net" + "net/netip" "time" "github.com/google/uuid" + "github.com/pkg/errors" pkgerrors "github.com/pkg/errors" "github.com/quic-go/quic-go" "github.com/rs/zerolog" @@ -32,6 +34,10 @@ const ( demuxChanCapacity = 16 ) +var ( + errInvalidDestinationIP = errors.New("unable to parse destination IP") +) + // DatagramSessionHandler is a service that can serve datagrams for a connection and handle sessions from incoming // connection streams. type DatagramSessionHandler interface { @@ -51,7 +57,10 @@ type datagramV2Connection struct { // datagramMuxer mux/demux datagrams from quic connection datagramMuxer *cfdquic.DatagramMuxerV2 - packetRouter *ingress.PacketRouter + // ingressUDPProxy acts as the origin dialer for UDP requests + ingressUDPProxy ingress.UDPOriginProxy + // packetRouter acts as the origin router for ICMP requests + packetRouter *ingress.PacketRouter rpcTimeout time.Duration streamWriteTimeout time.Duration @@ -61,6 +70,7 @@ type datagramV2Connection struct { func NewDatagramV2Connection(ctx context.Context, conn quic.Connection, + ingressUDPProxy ingress.UDPOriginProxy, icmpRouter ingress.ICMPRouter, index uint8, rpcTimeout time.Duration, @@ -79,6 +89,7 @@ func NewDatagramV2Connection(ctx context.Context, sessionManager: sessionManager, flowLimiter: flowLimiter, datagramMuxer: datagramMuxer, + ingressUDPProxy: ingressUDPProxy, packetRouter: packetRouter, rpcTimeout: rpcTimeout, streamWriteTimeout: streamWriteTimeout, @@ -128,12 +139,29 @@ func (q *datagramV2Connection) RegisterUdpSession(ctx context.Context, sessionID tracing.EndWithErrorStatus(registerSpan, err) return nil, err } + // We need to force the net.IP to IPv4 (if it's an IPv4 address) otherwise the net.IP conversion from capnp + // will be a IPv4-mapped-IPv6 address. + // In the case that the address is IPv6 we leave it untouched and parse it as normal. + ip := dstIP.To4() + if ip == nil { + ip = dstIP + } + // Parse the dstIP and dstPort into a netip.AddrPort + // This should never fail because the IP was already parsed as a valid net.IP + destAddr, ok := netip.AddrFromSlice(ip) + if !ok { + log.Err(errInvalidDestinationIP).Msgf("Failed to parse destination proxy IP: %s", ip) + tracing.EndWithErrorStatus(registerSpan, errInvalidDestinationIP) + q.flowLimiter.Release() + return nil, errInvalidDestinationIP + } + dstAddrPort := netip.AddrPortFrom(destAddr, dstPort) // Each session is a series of datagram from an eyeball to a dstIP:dstPort. // (src port, dst IP, dst port) uniquely identifies a session, so it needs a dedicated connected socket. - originProxy, err := ingress.DialUDP(dstIP, dstPort) + originProxy, err := q.ingressUDPProxy.DialUDP(dstAddrPort) if err != nil { - log.Err(err).Msgf("Failed to create udp proxy to %s:%d", dstIP, dstPort) + log.Err(err).Msgf("Failed to create udp proxy to %s", dstAddrPort) tracing.EndWithErrorStatus(registerSpan, err) q.flowLimiter.Release() return nil, err diff --git a/connection/quic_datagram_v2_test.go b/connection/quic_datagram_v2_test.go index af58ffdb..7e1f2f95 100644 --- a/connection/quic_datagram_v2_test.go +++ b/connection/quic_datagram_v2_test.go @@ -13,6 +13,7 @@ import ( "go.uber.org/mock/gomock" cfdflow "github.com/cloudflare/cloudflared/flow" + "github.com/cloudflare/cloudflared/ingress" "github.com/cloudflare/cloudflared/mocks" ) @@ -83,6 +84,7 @@ func TestRateLimitOnNewDatagramV2UDPSession(t *testing.T) { datagramConn := NewDatagramV2Connection( t.Context(), conn, + ingress.DefaultUDPDialer, nil, 0, 0*time.Second, diff --git a/ingress/origin_udp_proxy.go b/ingress/origin_udp_proxy.go index 012c05c0..eab7d783 100644 --- a/ingress/origin_udp_proxy.go +++ b/ingress/origin_udp_proxy.go @@ -2,37 +2,57 @@ package ingress import ( "fmt" - "io" "net" "net/netip" + + "github.com/rs/zerolog" ) -type UDPProxy interface { - io.ReadWriteCloser - LocalAddr() net.Addr +// UDPOriginService provides a proxy UDP dialer to origin services while allowing reserved +// services to be provided. These reserved services are assigned to specific [netip.AddrPort]s +// and provide their own [UDPOriginProxy]s to handle UDP origin dialing. +type UDPOriginService struct { + // Reserved services for reserved AddrPort values + reservedServices map[netip.AddrPort]UDPOriginProxy + // The default UDP Dialer used if no reserved services are found for an origin request. + defaultDialer UDPOriginProxy + + logger *zerolog.Logger } -type udpProxy struct { - *net.UDPConn +// UDPOriginProxy provides a UDP dial operation to a requested addr. +type UDPOriginProxy interface { + DialUDP(addr netip.AddrPort) (*net.UDPConn, error) } -func DialUDP(dstIP net.IP, dstPort uint16) (UDPProxy, error) { - dstAddr := &net.UDPAddr{ - IP: dstIP, - Port: int(dstPort), +func NewUDPOriginService(reserved map[netip.AddrPort]UDPOriginProxy, logger *zerolog.Logger) *UDPOriginService { + return &UDPOriginService{ + reservedServices: reserved, + defaultDialer: DefaultUDPDialer, + logger: logger, } - - // We use nil as local addr to force runtime to find the best suitable local address IP given the destination - // address as context. - udpConn, err := net.DialUDP("udp", nil, dstAddr) - if err != nil { - return nil, fmt.Errorf("unable to create UDP proxy to origin (%v:%v): %w", dstIP, dstPort, err) - } - - return &udpProxy{udpConn}, nil } -func DialUDPAddrPort(dest netip.AddrPort) (*net.UDPConn, error) { +// SetUDPDialer updates the default UDP Dialer used. +// Typically used in unit testing. +func (s *UDPOriginService) SetDefaultDialer(dialer UDPOriginProxy) { + s.defaultDialer = dialer +} + +// DialUDP will perform a dial UDP to the requested addr. +func (s *UDPOriginService) DialUDP(addr netip.AddrPort) (*net.UDPConn, error) { + // Check to see if any reserved services are available for this addr and call their dialer instead. + if dialer, ok := s.reservedServices[addr]; ok { + return dialer.DialUDP(addr) + } + return s.defaultDialer.DialUDP(addr) +} + +type defaultUDPDialer struct{} + +var DefaultUDPDialer UDPOriginProxy = &defaultUDPDialer{} + +func (d *defaultUDPDialer) DialUDP(dest netip.AddrPort) (*net.UDPConn, error) { addr := net.UDPAddrFromAddrPort(dest) // We use nil as local addr to force runtime to find the best suitable local address IP given the destination diff --git a/quic/v3/manager.go b/quic/v3/manager.go index c32cc563..a22adcea 100644 --- a/quic/v3/manager.go +++ b/quic/v3/manager.go @@ -2,12 +2,11 @@ package v3 import ( "errors" - "net" - "net/netip" "sync" "github.com/rs/zerolog" + "github.com/cloudflare/cloudflared/ingress" "github.com/cloudflare/cloudflared/management" cfdflow "github.com/cloudflare/cloudflared/flow" @@ -38,18 +37,16 @@ type SessionManager interface { UnregisterSession(requestID RequestID) } -type DialUDP func(dest netip.AddrPort) (*net.UDPConn, error) - type sessionManager struct { sessions map[RequestID]Session mutex sync.RWMutex - originDialer DialUDP + originDialer ingress.UDPOriginProxy limiter cfdflow.Limiter metrics Metrics log *zerolog.Logger } -func NewSessionManager(metrics Metrics, log *zerolog.Logger, originDialer DialUDP, limiter cfdflow.Limiter) SessionManager { +func NewSessionManager(metrics Metrics, log *zerolog.Logger, originDialer ingress.UDPOriginProxy, limiter cfdflow.Limiter) SessionManager { return &sessionManager{ sessions: make(map[RequestID]Session), originDialer: originDialer, @@ -76,7 +73,7 @@ func (s *sessionManager) RegisterSession(request *UDPSessionRegistrationDatagram } // Attempt to bind the UDP socket for the new session - origin, err := s.originDialer(request.Dest) + origin, err := s.originDialer.DialUDP(request.Dest) if err != nil { return nil, err } diff --git a/quic/v3/manager_test.go b/quic/v3/manager_test.go index 759b08c6..e6335b6a 100644 --- a/quic/v3/manager_test.go +++ b/quic/v3/manager_test.go @@ -20,7 +20,7 @@ import ( func TestRegisterSession(t *testing.T) { log := zerolog.Nop() - manager := v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort, cfdflow.NewLimiter(0)) + manager := v3.NewSessionManager(&noopMetrics{}, &log, ingress.DefaultUDPDialer, cfdflow.NewLimiter(0)) request := v3.UDPSessionRegistrationDatagram{ RequestID: testRequestID, @@ -76,7 +76,7 @@ func TestRegisterSession(t *testing.T) { func TestGetSession_Empty(t *testing.T) { log := zerolog.Nop() - manager := v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort, cfdflow.NewLimiter(0)) + manager := v3.NewSessionManager(&noopMetrics{}, &log, ingress.DefaultUDPDialer, cfdflow.NewLimiter(0)) _, err := manager.GetSession(testRequestID) if !errors.Is(err, v3.ErrSessionNotFound) { @@ -93,7 +93,7 @@ func TestRegisterSessionRateLimit(t *testing.T) { flowLimiterMock.EXPECT().Acquire("udp").Return(cfdflow.ErrTooManyActiveFlows) flowLimiterMock.EXPECT().Release().Times(0) - manager := v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort, flowLimiterMock) + manager := v3.NewSessionManager(&noopMetrics{}, &log, ingress.DefaultUDPDialer, flowLimiterMock) request := v3.UDPSessionRegistrationDatagram{ RequestID: testRequestID, diff --git a/quic/v3/muxer_test.go b/quic/v3/muxer_test.go index 555489f5..e3ca81c0 100644 --- a/quic/v3/muxer_test.go +++ b/quic/v3/muxer_test.go @@ -88,7 +88,7 @@ func (m *mockEyeball) SendICMPTTLExceed(icmp *packet.ICMP, rawPacket packet.RawP func TestDatagramConn_New(t *testing.T) { log := zerolog.Nop() - conn := v3.NewDatagramConn(newMockQuicConn(), v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort, cfdflow.NewLimiter(0)), &noopICMPRouter{}, 0, &noopMetrics{}, &log) + conn := v3.NewDatagramConn(newMockQuicConn(), v3.NewSessionManager(&noopMetrics{}, &log, ingress.DefaultUDPDialer, cfdflow.NewLimiter(0)), &noopICMPRouter{}, 0, &noopMetrics{}, &log) if conn == nil { t.Fatal("expected valid connection") } @@ -97,7 +97,7 @@ func TestDatagramConn_New(t *testing.T) { func TestDatagramConn_SendUDPSessionDatagram(t *testing.T) { log := zerolog.Nop() quic := newMockQuicConn() - conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort, cfdflow.NewLimiter(0)), &noopICMPRouter{}, 0, &noopMetrics{}, &log) + conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DefaultUDPDialer, cfdflow.NewLimiter(0)), &noopICMPRouter{}, 0, &noopMetrics{}, &log) payload := []byte{0xef, 0xef} err := conn.SendUDPSessionDatagram(payload) @@ -112,7 +112,7 @@ func TestDatagramConn_SendUDPSessionDatagram(t *testing.T) { func TestDatagramConn_SendUDPSessionResponse(t *testing.T) { log := zerolog.Nop() quic := newMockQuicConn() - conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort, cfdflow.NewLimiter(0)), &noopICMPRouter{}, 0, &noopMetrics{}, &log) + conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DefaultUDPDialer, cfdflow.NewLimiter(0)), &noopICMPRouter{}, 0, &noopMetrics{}, &log) err := conn.SendUDPSessionResponse(testRequestID, v3.ResponseDestinationUnreachable) require.NoError(t, err) @@ -134,7 +134,7 @@ func TestDatagramConn_SendUDPSessionResponse(t *testing.T) { func TestDatagramConnServe_ApplicationClosed(t *testing.T) { log := zerolog.Nop() quic := newMockQuicConn() - conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort, cfdflow.NewLimiter(0)), &noopICMPRouter{}, 0, &noopMetrics{}, &log) + conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DefaultUDPDialer, cfdflow.NewLimiter(0)), &noopICMPRouter{}, 0, &noopMetrics{}, &log) ctx, cancel := context.WithTimeout(t.Context(), 1*time.Second) defer cancel() @@ -150,7 +150,7 @@ func TestDatagramConnServe_ConnectionClosed(t *testing.T) { ctx, cancel := context.WithTimeout(t.Context(), 1*time.Second) defer cancel() quic.ctx = ctx - conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort, cfdflow.NewLimiter(0)), &noopICMPRouter{}, 0, &noopMetrics{}, &log) + conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DefaultUDPDialer, cfdflow.NewLimiter(0)), &noopICMPRouter{}, 0, &noopMetrics{}, &log) err := conn.Serve(t.Context()) if !errors.Is(err, context.DeadlineExceeded) { @@ -161,7 +161,7 @@ func TestDatagramConnServe_ConnectionClosed(t *testing.T) { func TestDatagramConnServe_ReceiveDatagramError(t *testing.T) { log := zerolog.Nop() quic := &mockQuicConnReadError{err: net.ErrClosed} - conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort, cfdflow.NewLimiter(0)), &noopICMPRouter{}, 0, &noopMetrics{}, &log) + conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DefaultUDPDialer, cfdflow.NewLimiter(0)), &noopICMPRouter{}, 0, &noopMetrics{}, &log) err := conn.Serve(t.Context()) if !errors.Is(err, net.ErrClosed) { diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index df8bbd46..965b8c0d 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -78,11 +78,14 @@ func NewSupervisor(config *TunnelConfig, orchestrator *orchestration.Orchestrato edgeBindAddr := config.EdgeBindAddr datagramMetrics := v3.NewMetrics(prometheus.DefaultRegisterer) - sessionManager := v3.NewSessionManager(datagramMetrics, config.Log, ingress.DialUDPAddrPort, orchestrator.GetFlowLimiter()) + // No reserved ingress services for now, hence the nil + ingressUDPService := ingress.NewUDPOriginService(nil, config.Log) + sessionManager := v3.NewSessionManager(datagramMetrics, config.Log, ingressUDPService, orchestrator.GetFlowLimiter()) edgeTunnelServer := EdgeTunnelServer{ config: config, orchestrator: orchestrator, + ingressUDPProxy: ingressUDPService, sessionManager: sessionManager, datagramMetrics: datagramMetrics, edgeAddrs: edgeIPs, diff --git a/supervisor/tunnel.go b/supervisor/tunnel.go index c708c944..18c294c5 100644 --- a/supervisor/tunnel.go +++ b/supervisor/tunnel.go @@ -166,6 +166,7 @@ func (f *ipAddrFallback) ShouldGetNewAddress(connIndex uint8, err error) (needsN type EdgeTunnelServer struct { config *TunnelConfig orchestrator *orchestration.Orchestrator + ingressUDPProxy ingress.UDPOriginProxy sessionManager v3.SessionManager datagramMetrics v3.Metrics edgeAddrHandler EdgeAddrHandler @@ -613,6 +614,7 @@ func (e *EdgeTunnelServer) serveQUIC( datagramSessionManager = connection.NewDatagramV2Connection( ctx, conn, + e.ingressUDPProxy, e.config.ICMPRouterServer, connIndex, e.config.RPCTimeout,