TUN-8861: Add session limiter to UDP session manager

## Summary
In order to make cloudflared behavior more predictable and
prevent an exhaustion of resources, we have decided to add
session limits that can be configured by the user. This first
commit introduces the session limiter and adds it to the UDP
handling path. For now the limiter is set to run only in
unlimited mode.
This commit is contained in:
João "Pisco" Fernandes
2025-01-20 02:52:32 -08:00
parent 8918b6729e
commit bf4954e96a
66 changed files with 3409 additions and 1184 deletions

View File

@@ -116,7 +116,7 @@ func (s *UDPSessionRegistrationDatagram) MarshalBinary() (data []byte, err error
data = make([]byte, sessionRegistrationIPv4DatagramHeaderLen+len(s.Payload))
}
data[0] = byte(UDPSessionRegistrationType)
data[1] = byte(flags)
data[1] = flags
binary.BigEndian.PutUint16(data[2:4], s.Dest.Port())
binary.BigEndian.PutUint16(data[4:6], uint16(s.IdleDurationHint.Seconds()))
err = s.RequestID.MarshalBinaryTo(data[6:22])
@@ -284,6 +284,8 @@ const (
ResponseDestinationUnreachable SessionRegistrationResp = 0x01
// Session registration was unable to bind to a local UDP socket.
ResponseUnableToBindSocket SessionRegistrationResp = 0x02
// Session registration failed due to the number of session being higher than the limit.
ResponseTooManyActiveSessions SessionRegistrationResp = 0x03
// Session registration failed with an unexpected error but provided a message.
ResponseErrorWithMsg SessionRegistrationResp = 0xff
)
@@ -311,6 +313,7 @@ func (s *UDPSessionRegistrationResponseDatagram) MarshalBinary() (data []byte, e
if len(s.ErrorMsg) > maxResponseErrorMessageLen {
return nil, wrapMarshalErr(ErrDatagramResponseMsgInvalidSize)
}
// nolint: gosec
errMsgLen := uint16(len(s.ErrorMsg))
data = make([]byte, datagramSessionRegistrationResponseLen+errMsgLen)

View File

@@ -7,6 +7,10 @@ import (
"sync"
"github.com/rs/zerolog"
"github.com/cloudflare/cloudflared/management"
cfdsession "github.com/cloudflare/cloudflared/session"
)
var (
@@ -16,6 +20,8 @@ var (
ErrSessionBoundToOtherConn = errors.New("flow is in use by another connection")
// ErrSessionAlreadyRegistered is returned when a registration already exists for this connection.
ErrSessionAlreadyRegistered = errors.New("flow is already registered for this connection")
// ErrSessionRegistrationRateLimited is returned when a registration fails due to rate limiting on the number of active sessions.
ErrSessionRegistrationRateLimited = errors.New("flow registration rate limited")
)
type SessionManager interface {
@@ -38,14 +44,16 @@ type sessionManager struct {
sessions map[RequestID]Session
mutex sync.RWMutex
originDialer DialUDP
limiter cfdsession.Limiter
metrics Metrics
log *zerolog.Logger
}
func NewSessionManager(metrics Metrics, log *zerolog.Logger, originDialer DialUDP) SessionManager {
func NewSessionManager(metrics Metrics, log *zerolog.Logger, originDialer DialUDP, limiter cfdsession.Limiter) SessionManager {
return &sessionManager{
sessions: make(map[RequestID]Session),
originDialer: originDialer,
limiter: limiter,
metrics: metrics,
log: log,
}
@@ -61,6 +69,12 @@ func (s *sessionManager) RegisterSession(request *UDPSessionRegistrationDatagram
}
return nil, ErrSessionBoundToOtherConn
}
// Try to start a new session
if err := s.limiter.Acquire(management.UDP.String()); err != nil {
return nil, ErrSessionRegistrationRateLimited
}
// Attempt to bind the UDP socket for the new session
origin, err := s.originDialer(request.Dest)
if err != nil {
@@ -100,4 +114,5 @@ func (s *sessionManager) UnregisterSession(requestID RequestID) {
_ = session.Close()
}
delete(s.sessions, requestID)
s.limiter.Release()
}

View File

@@ -8,14 +8,19 @@ import (
"time"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"github.com/cloudflare/cloudflared/mocks"
"github.com/cloudflare/cloudflared/ingress"
v3 "github.com/cloudflare/cloudflared/quic/v3"
cfdsession "github.com/cloudflare/cloudflared/session"
)
func TestRegisterSession(t *testing.T) {
log := zerolog.Nop()
manager := v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort)
manager := v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort, cfdsession.NewLimiter(0))
request := v3.UDPSessionRegistrationDatagram{
RequestID: testRequestID,
@@ -71,10 +76,32 @@ func TestRegisterSession(t *testing.T) {
func TestGetSession_Empty(t *testing.T) {
log := zerolog.Nop()
manager := v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort)
manager := v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort, cfdsession.NewLimiter(0))
_, err := manager.GetSession(testRequestID)
if !errors.Is(err, v3.ErrSessionNotFound) {
t.Fatalf("get session find no session: %v", err)
}
}
func TestRegisterSessionRateLimit(t *testing.T) {
log := zerolog.Nop()
ctrl := gomock.NewController(t)
sessionLimiterMock := mocks.NewMockLimiter(ctrl)
sessionLimiterMock.EXPECT().Acquire("udp").Return(cfdsession.ErrTooManyActiveSessions)
sessionLimiterMock.EXPECT().Release().Times(0)
manager := v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort, sessionLimiterMock)
request := v3.UDPSessionRegistrationDatagram{
RequestID: testRequestID,
Dest: netip.MustParseAddrPort("127.0.0.1:5000"),
Traced: false,
IdleDurationHint: 5 * time.Second,
Payload: nil,
}
_, err := manager.RegisterSession(&request, &noopEyeball{})
require.ErrorIs(t, err, v3.ErrSessionRegistrationRateLimited)
}

View File

@@ -143,8 +143,6 @@ func (c *datagramConn) SendICMPTTLExceed(icmp *packet.ICMP, rawPacket packet.Raw
return c.SendICMPPacket(c.icmpRouter.ConvertToTTLExceeded(icmp, rawPacket))
}
var errReadTimeout error = errors.New("receive datagram timeout")
// pollDatagrams will read datagrams from the underlying connection until the provided context is done.
func (c *datagramConn) pollDatagrams(ctx context.Context) {
for ctx.Err() == nil {
@@ -256,8 +254,12 @@ func (c *datagramConn) handleSessionRegistrationDatagram(ctx context.Context, da
// Session is already registered but to a different connection
c.handleSessionMigration(datagram.RequestID, &log)
return
case ErrSessionRegistrationRateLimited:
// There are too many concurrent sessions so we return an error to force a retry later
c.handleSessionRegistrationRateLimited(datagram, &log)
return
default:
log.Err(err).Msgf("flow registration failure")
log.Err(err).Msg("flow registration failure")
c.handleSessionRegistrationFailure(datagram.RequestID, &log)
return
}
@@ -278,7 +280,7 @@ func (c *datagramConn) handleSessionRegistrationDatagram(ctx context.Context, da
// [Session.Serve] is blocking and will continue this go routine till the end of the session lifetime.
start := time.Now()
err = session.Serve(ctx)
elapsedMS := time.Now().Sub(start).Milliseconds()
elapsedMS := time.Since(start).Milliseconds()
log = log.With().Int64(logDurationKey, elapsedMS).Logger()
if err == nil {
// We typically don't expect a session to close without some error response. [SessionIdleErr] is the typical
@@ -346,6 +348,16 @@ func (c *datagramConn) handleSessionRegistrationFailure(requestID RequestID, log
}
}
func (c *datagramConn) handleSessionRegistrationRateLimited(datagram *UDPSessionRegistrationDatagram, logger *zerolog.Logger) {
c.logger.Warn().Msg("Too many concurrent sessions being handled, rejecting udp proxy")
rateLimitResponse := ResponseTooManyActiveSessions
err := c.SendUDPSessionResponse(datagram.RequestID, rateLimitResponse)
if err != nil {
logger.Err(err).Msgf("unable to send flow registration error response (%d)", rateLimitResponse)
}
}
// Handles incoming datagrams that need to be sent to a registered session.
func (c *datagramConn) handleSessionPayloadDatagram(datagram *UDPSessionPayloadDatagram, logger *zerolog.Logger) {
s, err := c.sessionManager.GetSession(datagram.RequestID)

View File

@@ -13,16 +13,17 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/google/gopacket/layers"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/icmp"
"golang.org/x/net/ipv4"
"github.com/cloudflare/cloudflared/ingress"
"github.com/cloudflare/cloudflared/packet"
v3 "github.com/cloudflare/cloudflared/quic/v3"
cfdsession "github.com/cloudflare/cloudflared/session"
)
type noopEyeball struct {
@@ -87,7 +88,7 @@ func (m *mockEyeball) SendICMPTTLExceed(icmp *packet.ICMP, rawPacket packet.RawP
func TestDatagramConn_New(t *testing.T) {
log := zerolog.Nop()
conn := v3.NewDatagramConn(newMockQuicConn(), v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort), &noopICMPRouter{}, 0, &noopMetrics{}, &log)
conn := v3.NewDatagramConn(newMockQuicConn(), v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort, cfdsession.NewLimiter(0)), &noopICMPRouter{}, 0, &noopMetrics{}, &log)
if conn == nil {
t.Fatal("expected valid connection")
}
@@ -96,10 +97,12 @@ func TestDatagramConn_New(t *testing.T) {
func TestDatagramConn_SendUDPSessionDatagram(t *testing.T) {
log := zerolog.Nop()
quic := newMockQuicConn()
conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort), &noopICMPRouter{}, 0, &noopMetrics{}, &log)
conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort, cfdsession.NewLimiter(0)), &noopICMPRouter{}, 0, &noopMetrics{}, &log)
payload := []byte{0xef, 0xef}
conn.SendUDPSessionDatagram(payload)
err := conn.SendUDPSessionDatagram(payload)
require.NoError(t, err)
p := <-quic.recv
if !slices.Equal(p, payload) {
t.Fatal("datagram sent does not match datagram received on quic side")
@@ -109,15 +112,16 @@ func TestDatagramConn_SendUDPSessionDatagram(t *testing.T) {
func TestDatagramConn_SendUDPSessionResponse(t *testing.T) {
log := zerolog.Nop()
quic := newMockQuicConn()
conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort), &noopICMPRouter{}, 0, &noopMetrics{}, &log)
conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort, cfdsession.NewLimiter(0)), &noopICMPRouter{}, 0, &noopMetrics{}, &log)
err := conn.SendUDPSessionResponse(testRequestID, v3.ResponseDestinationUnreachable)
require.NoError(t, err)
conn.SendUDPSessionResponse(testRequestID, v3.ResponseDestinationUnreachable)
resp := <-quic.recv
var response v3.UDPSessionRegistrationResponseDatagram
err := response.UnmarshalBinary(resp)
if err != nil {
t.Fatal(err)
}
err = response.UnmarshalBinary(resp)
require.NoError(t, err)
expected := v3.UDPSessionRegistrationResponseDatagram{
RequestID: testRequestID,
ResponseType: v3.ResponseDestinationUnreachable,
@@ -130,7 +134,7 @@ func TestDatagramConn_SendUDPSessionResponse(t *testing.T) {
func TestDatagramConnServe_ApplicationClosed(t *testing.T) {
log := zerolog.Nop()
quic := newMockQuicConn()
conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort), &noopICMPRouter{}, 0, &noopMetrics{}, &log)
conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort, cfdsession.NewLimiter(0)), &noopICMPRouter{}, 0, &noopMetrics{}, &log)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
@@ -146,7 +150,7 @@ func TestDatagramConnServe_ConnectionClosed(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
quic.ctx = ctx
conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort), &noopICMPRouter{}, 0, &noopMetrics{}, &log)
conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort, cfdsession.NewLimiter(0)), &noopICMPRouter{}, 0, &noopMetrics{}, &log)
err := conn.Serve(context.Background())
if !errors.Is(err, context.DeadlineExceeded) {
@@ -157,7 +161,7 @@ func TestDatagramConnServe_ConnectionClosed(t *testing.T) {
func TestDatagramConnServe_ReceiveDatagramError(t *testing.T) {
log := zerolog.Nop()
quic := &mockQuicConnReadError{err: net.ErrClosed}
conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort), &noopICMPRouter{}, 0, &noopMetrics{}, &log)
conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort, cfdsession.NewLimiter(0)), &noopICMPRouter{}, 0, &noopMetrics{}, &log)
err := conn.Serve(context.Background())
if !errors.Is(err, net.ErrClosed) {
@@ -165,6 +169,38 @@ func TestDatagramConnServe_ReceiveDatagramError(t *testing.T) {
}
}
func TestDatagramConnServe_SessionRegistrationRateLimit(t *testing.T) {
log := zerolog.Nop()
quic := newMockQuicConn()
sessionManager := &mockSessionManager{
expectedRegErr: v3.ErrSessionRegistrationRateLimited,
}
conn := v3.NewDatagramConn(quic, sessionManager, &noopICMPRouter{}, 0, &noopMetrics{}, &log)
// Setup the muxer
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
done := make(chan error, 1)
go func() {
done <- conn.Serve(ctx)
}()
// Send new session registration
datagram := newRegisterSessionDatagram(testRequestID)
quic.send <- datagram
// Wait for session registration response with failure
datagram = <-quic.recv
var resp v3.UDPSessionRegistrationResponseDatagram
err := resp.UnmarshalBinary(datagram)
if err != nil {
t.Fatal(err)
}
require.EqualValues(t, testRequestID, resp.RequestID)
require.EqualValues(t, v3.ResponseTooManyActiveSessions, resp.ResponseType)
}
func TestDatagramConnServe_ErrorDatagramTypes(t *testing.T) {
for _, test := range []struct {
name string
@@ -354,11 +390,9 @@ func TestDatagramConnServeDecodeMultipleICMPInParallel(t *testing.T) {
var receivedPackets []*packet.ICMP
go func() {
for ctx.Err() == nil {
select {
case icmpPacket := <-router.recv:
receivedPackets = append(receivedPackets, icmpPacket)
wg.Done()
}
icmpPacket := <-router.recv
receivedPackets = append(receivedPackets, icmpPacket)
wg.Done()
}
}()
@@ -677,7 +711,7 @@ func TestDatagramConnServe_ICMPDatagram_TTLExceeded(t *testing.T) {
datagram := newICMPDatagram(expectedICMP)
quic.send <- datagram
// Origin should not recieve a packet
// Origin should not receive a packet
select {
case <-router.recv:
t.Fatalf("TTL should be expired and no origin ICMP sent")
@@ -719,18 +753,6 @@ func newRegisterSessionDatagram(id v3.RequestID) []byte {
return payload
}
func newRegisterResponseSessionDatagram(id v3.RequestID, resp v3.SessionRegistrationResp) []byte {
datagram := v3.UDPSessionRegistrationResponseDatagram{
RequestID: id,
ResponseType: resp,
}
payload, err := datagram.MarshalBinary()
if err != nil {
panic(err)
}
return payload
}
func newSessionPayloadDatagram(id v3.RequestID, payload []byte) []byte {
datagram := make([]byte, len(payload)+17)
err := v3.MarshalPayloadHeaderTo(id, datagram[:])