mirror of
https://github.com/cloudflare/cloudflared.git
synced 2025-07-27 16:39:58 +00:00
TUN-8158: Bring back commit e653741885
and fixes infinite loop on linux when the socket is closed
This commit is contained in:
@@ -131,7 +131,7 @@ func newICMPProxy(listenIP netip.Addr, zone string, logger *zerolog.Logger, idle
|
||||
}
|
||||
|
||||
func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error {
|
||||
ctx, span := responder.requestSpan(ctx, pk)
|
||||
_, span := responder.requestSpan(ctx, pk)
|
||||
defer responder.exportSpan()
|
||||
|
||||
originalEcho, err := getICMPEcho(pk.Message)
|
||||
@@ -139,10 +139,8 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
|
||||
tracing.EndWithErrorStatus(span, err)
|
||||
return err
|
||||
}
|
||||
span.SetAttributes(
|
||||
attribute.Int("originalEchoID", originalEcho.ID),
|
||||
attribute.Int("seq", originalEcho.Seq),
|
||||
)
|
||||
observeICMPRequest(ip.logger, span, pk.Src.String(), pk.Dst.String(), originalEcho.ID, originalEcho.Seq)
|
||||
|
||||
echoIDTrackerKey := flow3Tuple{
|
||||
srcIP: pk.Src,
|
||||
dstIP: pk.Dst,
|
||||
@@ -189,6 +187,7 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
|
||||
tracing.EndWithErrorStatus(span, err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = icmpFlow.sendToDst(pk.Dst, pk.Message)
|
||||
if err != nil {
|
||||
tracing.EndWithErrorStatus(span, err)
|
||||
@@ -269,15 +268,12 @@ func (ip *icmpProxy) sendReply(ctx context.Context, reply *echoReply) error {
|
||||
_, span := icmpFlow.responder.replySpan(ctx, ip.logger)
|
||||
defer icmpFlow.responder.exportSpan()
|
||||
|
||||
span.SetAttributes(
|
||||
attribute.String("dst", reply.from.String()),
|
||||
attribute.Int("echoID", reply.echo.ID),
|
||||
attribute.Int("seq", reply.echo.Seq),
|
||||
attribute.Int("originalEchoID", icmpFlow.originalEchoID),
|
||||
)
|
||||
if err := icmpFlow.returnToSrc(reply); err != nil {
|
||||
tracing.EndWithErrorStatus(span, err)
|
||||
return err
|
||||
}
|
||||
observeICMPReply(ip.logger, span, reply.from.String(), reply.echo.ID, reply.echo.Seq)
|
||||
span.SetAttributes(attribute.Int("originalEchoID", icmpFlow.originalEchoID))
|
||||
tracing.End(span)
|
||||
return nil
|
||||
}
|
||||
|
@@ -107,10 +107,7 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
|
||||
tracing.EndWithErrorStatus(span, err)
|
||||
return err
|
||||
}
|
||||
span.SetAttributes(
|
||||
attribute.Int("originalEchoID", originalEcho.ID),
|
||||
attribute.Int("seq", originalEcho.Seq),
|
||||
)
|
||||
observeICMPRequest(ip.logger, span, pk.Src.String(), pk.Dst.String(), originalEcho.ID, originalEcho.Seq)
|
||||
|
||||
shouldReplaceFunnelFunc := createShouldReplaceFunnelFunc(ip.logger, responder.datagramMuxer, pk, originalEcho.ID)
|
||||
newFunnelFunc := func() (packet.Funnel, error) {
|
||||
@@ -156,14 +153,8 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
|
||||
Int("originalEchoID", originalEcho.ID).
|
||||
Msg("New flow")
|
||||
go func() {
|
||||
defer ip.srcFunnelTracker.Unregister(funnelID, icmpFlow)
|
||||
if err := ip.listenResponse(ctx, icmpFlow); err != nil {
|
||||
ip.logger.Debug().Err(err).
|
||||
Str("src", pk.Src.String()).
|
||||
Str("dst", pk.Dst.String()).
|
||||
Int("originalEchoID", originalEcho.ID).
|
||||
Msg("Failed to listen for ICMP echo response")
|
||||
}
|
||||
ip.listenResponse(ctx, icmpFlow)
|
||||
ip.srcFunnelTracker.Unregister(funnelID, icmpFlow)
|
||||
}()
|
||||
}
|
||||
if err := icmpFlow.sendToDst(pk.Dst, pk.Message); err != nil {
|
||||
@@ -179,17 +170,17 @@ func (ip *icmpProxy) Serve(ctx context.Context) error {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
func (ip *icmpProxy) listenResponse(ctx context.Context, flow *icmpEchoFlow) error {
|
||||
func (ip *icmpProxy) listenResponse(ctx context.Context, flow *icmpEchoFlow) {
|
||||
buf := make([]byte, mtu)
|
||||
for {
|
||||
retryable, err := ip.handleResponse(ctx, flow, buf)
|
||||
if err != nil && !retryable {
|
||||
return err
|
||||
if done := ip.handleResponse(ctx, flow, buf); done {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ip *icmpProxy) handleResponse(ctx context.Context, flow *icmpEchoFlow, buf []byte) (retryableErr bool, err error) {
|
||||
// Listens for ICMP response and handles error logging
|
||||
func (ip *icmpProxy) handleResponse(ctx context.Context, flow *icmpEchoFlow, buf []byte) (done bool) {
|
||||
_, span := flow.responder.replySpan(ctx, ip.logger)
|
||||
defer flow.responder.exportSpan()
|
||||
|
||||
@@ -199,33 +190,36 @@ func (ip *icmpProxy) handleResponse(ctx context.Context, flow *icmpEchoFlow, buf
|
||||
|
||||
n, from, err := flow.originConn.ReadFrom(buf)
|
||||
if err != nil {
|
||||
if flow.IsClosed() {
|
||||
tracing.EndWithErrorStatus(span, fmt.Errorf("flow was closed"))
|
||||
return true
|
||||
}
|
||||
ip.logger.Error().Err(err).Str("socket", flow.originConn.LocalAddr().String()).Msg("Failed to read from ICMP socket")
|
||||
tracing.EndWithErrorStatus(span, err)
|
||||
return false, err
|
||||
return true
|
||||
}
|
||||
reply, err := parseReply(from, buf[:n])
|
||||
if err != nil {
|
||||
ip.logger.Error().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply")
|
||||
tracing.EndWithErrorStatus(span, err)
|
||||
return true, err
|
||||
return false
|
||||
}
|
||||
if !isEchoReply(reply.msg) {
|
||||
err := fmt.Errorf("Expect ICMP echo reply, got %s", reply.msg.Type)
|
||||
ip.logger.Debug().Str("dst", from.String()).Msgf("Drop ICMP %s from reply", reply.msg.Type)
|
||||
tracing.EndWithErrorStatus(span, err)
|
||||
return true, err
|
||||
return false
|
||||
}
|
||||
span.SetAttributes(
|
||||
attribute.String("dst", reply.from.String()),
|
||||
attribute.Int("echoID", reply.echo.ID),
|
||||
attribute.Int("seq", reply.echo.Seq),
|
||||
)
|
||||
|
||||
if err := flow.returnToSrc(reply); err != nil {
|
||||
ip.logger.Debug().Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply")
|
||||
ip.logger.Error().Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply")
|
||||
tracing.EndWithErrorStatus(span, err)
|
||||
return true, err
|
||||
return false
|
||||
}
|
||||
|
||||
observeICMPReply(ip.logger, span, from.String(), reply.echo.ID, reply.echo.Seq)
|
||||
tracing.End(span)
|
||||
return true, nil
|
||||
return false
|
||||
}
|
||||
|
||||
// Only linux uses flow3Tuple as FunnelID
|
||||
|
@@ -8,6 +8,7 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/netip"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/google/gopacket/layers"
|
||||
"github.com/rs/zerolog"
|
||||
@@ -46,6 +47,7 @@ type flow3Tuple struct {
|
||||
type icmpEchoFlow struct {
|
||||
*packet.ActivityTracker
|
||||
closeCallback func() error
|
||||
closed *atomic.Bool
|
||||
src netip.Addr
|
||||
originConn *icmp.PacketConn
|
||||
responder *packetResponder
|
||||
@@ -59,6 +61,7 @@ func newICMPEchoFlow(src netip.Addr, closeCallback func() error, originConn *icm
|
||||
return &icmpEchoFlow{
|
||||
ActivityTracker: packet.NewActivityTracker(),
|
||||
closeCallback: closeCallback,
|
||||
closed: &atomic.Bool{},
|
||||
src: src,
|
||||
originConn: originConn,
|
||||
responder: responder,
|
||||
@@ -86,9 +89,14 @@ func (ief *icmpEchoFlow) Equal(other packet.Funnel) bool {
|
||||
}
|
||||
|
||||
func (ief *icmpEchoFlow) Close() error {
|
||||
ief.closed.Store(true)
|
||||
return ief.closeCallback()
|
||||
}
|
||||
|
||||
func (ief *icmpEchoFlow) IsClosed() bool {
|
||||
return ief.closed.Load()
|
||||
}
|
||||
|
||||
// sendToDst rewrites the echo ID to the one assigned to this flow
|
||||
func (ief *icmpEchoFlow) sendToDst(dst netip.Addr, msg *icmp.Message) error {
|
||||
ief.UpdateLastActive()
|
||||
|
@@ -8,6 +8,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/fortytw2/leaktest"
|
||||
"github.com/google/gopacket/layers"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -18,6 +19,8 @@ import (
|
||||
)
|
||||
|
||||
func TestFunnelIdleTimeout(t *testing.T) {
|
||||
defer leaktest.Check(t)()
|
||||
|
||||
const (
|
||||
idleTimeout = time.Second
|
||||
echoID = 42573
|
||||
@@ -73,13 +76,16 @@ func TestFunnelIdleTimeout(t *testing.T) {
|
||||
require.NoError(t, proxy.Request(ctx, &pk, &newResponder))
|
||||
validateEchoFlow(t, <-newMuxer.cfdToEdge, &pk)
|
||||
|
||||
time.Sleep(idleTimeout * 2)
|
||||
cancel()
|
||||
<-proxyDone
|
||||
}
|
||||
|
||||
func TestReuseFunnel(t *testing.T) {
|
||||
defer leaktest.Check(t)()
|
||||
|
||||
const (
|
||||
idleTimeout = time.Second
|
||||
idleTimeout = time.Millisecond * 100
|
||||
echoID = 42573
|
||||
startSeq = 8129
|
||||
)
|
||||
@@ -135,6 +141,8 @@ func TestReuseFunnel(t *testing.T) {
|
||||
require.True(t, found)
|
||||
require.Equal(t, funnel1, funnel2)
|
||||
|
||||
time.Sleep(idleTimeout * 2)
|
||||
|
||||
cancel()
|
||||
<-proxyDone
|
||||
}
|
||||
|
@@ -281,10 +281,7 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
requestSpan.SetAttributes(
|
||||
attribute.Int("originalEchoID", echo.ID),
|
||||
attribute.Int("seq", echo.Seq),
|
||||
)
|
||||
observeICMPRequest(ip.logger, requestSpan, pk.Src.String(), pk.Dst.String(), echo.ID, echo.Seq)
|
||||
|
||||
resp, err := ip.icmpEchoRoundtrip(pk.Dst, echo)
|
||||
if err != nil {
|
||||
@@ -296,17 +293,17 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
|
||||
responder.exportSpan()
|
||||
|
||||
_, replySpan := responder.replySpan(ctx, ip.logger)
|
||||
replySpan.SetAttributes(
|
||||
attribute.Int("originalEchoID", echo.ID),
|
||||
attribute.Int("seq", echo.Seq),
|
||||
attribute.Int64("rtt", int64(resp.rtt())),
|
||||
attribute.String("status", resp.status().String()),
|
||||
)
|
||||
err = ip.handleEchoReply(pk, echo, resp, responder)
|
||||
if err != nil {
|
||||
ip.logger.Err(err).Msg("Failed to send ICMP reply")
|
||||
tracing.EndWithErrorStatus(replySpan, err)
|
||||
return errors.Wrap(err, "failed to handle ICMP echo reply")
|
||||
}
|
||||
observeICMPReply(ip.logger, replySpan, pk.Dst.String(), echo.ID, echo.Seq)
|
||||
replySpan.SetAttributes(
|
||||
attribute.Int64("rtt", int64(resp.rtt())),
|
||||
attribute.String("status", resp.status().String()),
|
||||
)
|
||||
tracing.End(replySpan)
|
||||
return nil
|
||||
}
|
||||
|
@@ -7,6 +7,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"golang.org/x/net/icmp"
|
||||
"golang.org/x/net/ipv4"
|
||||
"golang.org/x/net/ipv6"
|
||||
@@ -15,9 +17,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// funnelIdleTimeout controls how long to wait to close a funnel without send/return
|
||||
funnelIdleTimeout = time.Second * 10
|
||||
mtu = 1500
|
||||
mtu = 1500
|
||||
// icmpRequestTimeoutMs controls how long to wait for a reply
|
||||
icmpRequestTimeoutMs = 1000
|
||||
)
|
||||
@@ -32,8 +32,9 @@ type icmpRouter struct {
|
||||
}
|
||||
|
||||
// NewICMPRouter doesn't return an error if either ipv4 proxy or ipv6 proxy can be created. The machine might only
|
||||
// support one of them
|
||||
func NewICMPRouter(ipv4Addr, ipv6Addr netip.Addr, ipv6Zone string, logger *zerolog.Logger) (*icmpRouter, error) {
|
||||
// support one of them.
|
||||
// funnelIdleTimeout controls how long to wait to close a funnel without send/return
|
||||
func NewICMPRouter(ipv4Addr, ipv6Addr netip.Addr, ipv6Zone string, logger *zerolog.Logger, funnelIdleTimeout time.Duration) (*icmpRouter, error) {
|
||||
ipv4Proxy, ipv4Err := newICMPProxy(ipv4Addr, "", logger, funnelIdleTimeout)
|
||||
ipv6Proxy, ipv6Err := newICMPProxy(ipv6Addr, ipv6Zone, logger, funnelIdleTimeout)
|
||||
if ipv4Err != nil && ipv6Err != nil {
|
||||
@@ -102,3 +103,25 @@ func getICMPEcho(msg *icmp.Message) (*icmp.Echo, error) {
|
||||
func isEchoReply(msg *icmp.Message) bool {
|
||||
return msg.Type == ipv4.ICMPTypeEchoReply || msg.Type == ipv6.ICMPTypeEchoReply
|
||||
}
|
||||
|
||||
func observeICMPRequest(logger *zerolog.Logger, span trace.Span, src string, dst string, echoID int, seq int) {
|
||||
logger.Debug().
|
||||
Str("src", src).
|
||||
Str("dst", dst).
|
||||
Int("originalEchoID", echoID).
|
||||
Int("originalEchoSeq", seq).
|
||||
Msg("Received ICMP request")
|
||||
span.SetAttributes(
|
||||
attribute.Int("originalEchoID", echoID),
|
||||
attribute.Int("seq", seq),
|
||||
)
|
||||
}
|
||||
|
||||
func observeICMPReply(logger *zerolog.Logger, span trace.Span, dst string, echoID int, seq int) {
|
||||
logger.Debug().Str("dst", dst).Int("echoID", echoID).Int("seq", seq).Msg("Sent ICMP reply to edge")
|
||||
span.SetAttributes(
|
||||
attribute.String("dst", dst),
|
||||
attribute.Int("echoID", echoID),
|
||||
attribute.Int("seq", seq),
|
||||
)
|
||||
}
|
||||
|
@@ -9,7 +9,9 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/fortytw2/leaktest"
|
||||
"github.com/google/gopacket/layers"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -23,9 +25,10 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
noopLogger = zerolog.Nop()
|
||||
localhostIP = netip.MustParseAddr("127.0.0.1")
|
||||
localhostIPv6 = netip.MustParseAddr("::1")
|
||||
noopLogger = zerolog.Nop()
|
||||
localhostIP = netip.MustParseAddr("127.0.0.1")
|
||||
localhostIPv6 = netip.MustParseAddr("::1")
|
||||
testFunnelIdleTimeout = time.Millisecond * 10
|
||||
)
|
||||
|
||||
// TestICMPProxyEcho makes sure we can send ICMP echo via the Request method and receives response via the
|
||||
@@ -40,12 +43,14 @@ func TestICMPRouterEcho(t *testing.T) {
|
||||
}
|
||||
|
||||
func testICMPRouterEcho(t *testing.T, sendIPv4 bool) {
|
||||
defer leaktest.Check(t)()
|
||||
|
||||
const (
|
||||
echoID = 36571
|
||||
endSeq = 20
|
||||
)
|
||||
|
||||
router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger)
|
||||
router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger, testFunnelIdleTimeout)
|
||||
require.NoError(t, err)
|
||||
|
||||
proxyDone := make(chan struct{})
|
||||
@@ -97,14 +102,19 @@ func testICMPRouterEcho(t *testing.T, sendIPv4 bool) {
|
||||
validateEchoFlow(t, <-muxer.cfdToEdge, &pk)
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure funnel cleanup kicks in
|
||||
time.Sleep(testFunnelIdleTimeout * 2)
|
||||
cancel()
|
||||
<-proxyDone
|
||||
}
|
||||
|
||||
func TestTraceICMPRouterEcho(t *testing.T) {
|
||||
defer leaktest.Check(t)()
|
||||
|
||||
tracingCtx := "ec31ad8a01fde11fdcabe2efdce36873:52726f6cabc144f5:0:1"
|
||||
|
||||
router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger)
|
||||
router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger, testFunnelIdleTimeout)
|
||||
require.NoError(t, err)
|
||||
|
||||
proxyDone := make(chan struct{})
|
||||
@@ -196,6 +206,7 @@ func TestTraceICMPRouterEcho(t *testing.T) {
|
||||
default:
|
||||
}
|
||||
|
||||
time.Sleep(testFunnelIdleTimeout * 2)
|
||||
cancel()
|
||||
<-proxyDone
|
||||
}
|
||||
@@ -203,12 +214,14 @@ func TestTraceICMPRouterEcho(t *testing.T) {
|
||||
// TestConcurrentRequests makes sure icmpRouter can send concurrent requests to the same destination with different
|
||||
// echo ID. This simulates concurrent ping to the same destination.
|
||||
func TestConcurrentRequestsToSameDst(t *testing.T) {
|
||||
defer leaktest.Check(t)()
|
||||
|
||||
const (
|
||||
concurrentPings = 5
|
||||
endSeq = 5
|
||||
)
|
||||
|
||||
router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger)
|
||||
router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger, testFunnelIdleTimeout)
|
||||
require.NoError(t, err)
|
||||
|
||||
proxyDone := make(chan struct{})
|
||||
@@ -282,12 +295,16 @@ func TestConcurrentRequestsToSameDst(t *testing.T) {
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
time.Sleep(testFunnelIdleTimeout * 2)
|
||||
cancel()
|
||||
<-proxyDone
|
||||
}
|
||||
|
||||
// TestICMPProxyRejectNotEcho makes sure it rejects messages other than echo
|
||||
func TestICMPRouterRejectNotEcho(t *testing.T) {
|
||||
defer leaktest.Check(t)()
|
||||
|
||||
msgs := []icmp.Message{
|
||||
{
|
||||
Type: ipv4.ICMPTypeDestinationUnreachable,
|
||||
@@ -341,7 +358,7 @@ func TestICMPRouterRejectNotEcho(t *testing.T) {
|
||||
}
|
||||
|
||||
func testICMPRouterRejectNotEcho(t *testing.T, srcDstIP netip.Addr, msgs []icmp.Message) {
|
||||
router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger)
|
||||
router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger, testFunnelIdleTimeout)
|
||||
require.NoError(t, err)
|
||||
|
||||
muxer := newMockMuxer(1)
|
||||
|
Reference in New Issue
Block a user