mirror of
https://github.com/cloudflare/cloudflared.git
synced 2025-07-27 16:49:57 +00:00
ZTC-234: Replace ICMP funnels when ingress connection changes
Origintunneld has been observed to continue sending reply packets to the first incoming connection it received, even if a newer connection is observed to be sending the requests. OTD uses the funnel library from cloudflared, which is why the changes are here. In theory, cloudflared has the same type of bug where a ping session switching between quic connections will continue sending replies to the first connection. This bug has not been tested or confirmed though, but this PR will fix if it exists.
This commit is contained in:
@@ -157,6 +157,7 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
|
||||
}
|
||||
span.SetAttributes(attribute.Int("assignedEchoID", int(assignedEchoID)))
|
||||
|
||||
shouldReplaceFunnelFunc := createShouldReplaceFunnelFunc(ip.logger, responder.datagramMuxer, pk, originalEcho.ID)
|
||||
newFunnelFunc := func() (packet.Funnel, error) {
|
||||
originalEcho, err := getICMPEcho(pk.Message)
|
||||
if err != nil {
|
||||
@@ -170,7 +171,7 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
|
||||
return icmpFlow, nil
|
||||
}
|
||||
funnelID := echoFunnelID(assignedEchoID)
|
||||
funnel, isNew, err := ip.srcFunnelTracker.GetOrRegister(funnelID, newFunnelFunc)
|
||||
funnel, isNew, err := ip.srcFunnelTracker.GetOrRegister(funnelID, shouldReplaceFunnelFunc, newFunnelFunc)
|
||||
if err != nil {
|
||||
tracing.EndWithErrorStatus(span, err)
|
||||
return err
|
||||
|
@@ -112,6 +112,7 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
|
||||
attribute.Int("seq", originalEcho.Seq),
|
||||
)
|
||||
|
||||
shouldReplaceFunnelFunc := createShouldReplaceFunnelFunc(ip.logger, responder.datagramMuxer, pk, originalEcho.ID)
|
||||
newFunnelFunc := func() (packet.Funnel, error) {
|
||||
conn, err := newICMPConn(ip.listenIP, ip.ipv6Zone)
|
||||
if err != nil {
|
||||
@@ -137,7 +138,7 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
|
||||
dstIP: pk.Dst,
|
||||
originalEchoID: originalEcho.ID,
|
||||
}
|
||||
funnel, isNew, err := ip.srcFunnelTracker.GetOrRegister(funnelID, newFunnelFunc)
|
||||
funnel, isNew, err := ip.srcFunnelTracker.GetOrRegister(funnelID, shouldReplaceFunnelFunc, newFunnelFunc)
|
||||
if err != nil {
|
||||
tracing.EndWithErrorStatus(span, err)
|
||||
return err
|
||||
|
@@ -10,6 +10,7 @@ import (
|
||||
"net/netip"
|
||||
|
||||
"github.com/google/gopacket/layers"
|
||||
"github.com/rs/zerolog"
|
||||
"golang.org/x/net/icmp"
|
||||
|
||||
"github.com/cloudflare/cloudflared/packet"
|
||||
@@ -174,3 +175,30 @@ func toICMPEchoFlow(funnel packet.Funnel) (*icmpEchoFlow, error) {
|
||||
}
|
||||
return icmpFlow, nil
|
||||
}
|
||||
|
||||
func createShouldReplaceFunnelFunc(logger *zerolog.Logger, muxer muxer, pk *packet.ICMP, originalEchoID int) func(packet.Funnel) bool {
|
||||
return func(existing packet.Funnel) bool {
|
||||
existingFlow, err := toICMPEchoFlow(existing)
|
||||
if err != nil {
|
||||
logger.Err(err).
|
||||
Str("src", pk.Src.String()).
|
||||
Str("dst", pk.Dst.String()).
|
||||
Int("originalEchoID", originalEchoID).
|
||||
Msg("Funnel of wrong type found")
|
||||
return true
|
||||
}
|
||||
// Each quic connection should have a unique muxer.
|
||||
// If the existing flow has a different muxer, there's a new quic connection where return packets should be
|
||||
// routed. Otherwise, return packets will be send to the first observed incoming connection, rather than the
|
||||
// most recently observed connection.
|
||||
if existingFlow.responder.datagramMuxer != muxer {
|
||||
logger.Debug().
|
||||
Str("src", pk.Src.String()).
|
||||
Str("dst", pk.Dst.String()).
|
||||
Int("originalEchoID", originalEchoID).
|
||||
Msg("Replacing funnel with new responder")
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
@@ -52,18 +52,28 @@ func TestFunnelIdleTimeout(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
funnelID := flow3Tuple{
|
||||
srcIP: pk.Src,
|
||||
dstIP: pk.Dst,
|
||||
originalEchoID: echoID,
|
||||
}
|
||||
muxer := newMockMuxer(0)
|
||||
responder := packetResponder{
|
||||
datagramMuxer: muxer,
|
||||
}
|
||||
require.NoError(t, proxy.Request(ctx, &pk, &responder))
|
||||
validateEchoFlow(t, <-muxer.cfdToEdge, &pk)
|
||||
funnel1, found := proxy.srcFunnelTracker.Get(funnelID)
|
||||
require.True(t, found)
|
||||
|
||||
// Send second request, should reuse the funnel
|
||||
require.NoError(t, proxy.Request(ctx, &pk, &packetResponder{
|
||||
datagramMuxer: nil,
|
||||
datagramMuxer: muxer,
|
||||
}))
|
||||
validateEchoFlow(t, <-muxer.cfdToEdge, &pk)
|
||||
funnel2, found := proxy.srcFunnelTracker.Get(funnelID)
|
||||
require.True(t, found)
|
||||
require.Equal(t, funnel1, funnel2)
|
||||
|
||||
time.Sleep(idleTimeout * 2)
|
||||
newMuxer := newMockMuxer(0)
|
||||
|
Reference in New Issue
Block a user