mirror of
https://github.com/cloudflare/cloudflared.git
synced 2025-07-27 20:50:00 +00:00
TUN-5494: Send a RPC with terminate reason to edge if the session is closed locally
This commit is contained in:
@@ -168,6 +168,7 @@ func (q *QUICConnection) handleRPCStream(rpcStream *quicpogs.RPCServerStream) er
|
||||
return rpcStream.Serve(q, q.logger)
|
||||
}
|
||||
|
||||
// RegisterUdpSession is the RPC method invoked by edge to register and run a session
|
||||
func (q *QUICConnection) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeAfterIdleHint time.Duration) error {
|
||||
// Each session is a series of datagram from an eyeball to a dstIP:dstPort.
|
||||
// (src port, dst IP, dst port) uniquely identifies a session, so it needs a dedicated connected socket.
|
||||
@@ -178,22 +179,60 @@ func (q *QUICConnection) RegisterUdpSession(ctx context.Context, sessionID uuid.
|
||||
}
|
||||
session, err := q.sessionManager.RegisterSession(ctx, sessionID, originProxy)
|
||||
if err != nil {
|
||||
q.logger.Err(err).Msgf("Failed to register udp session %s", sessionID)
|
||||
q.logger.Err(err).Str("sessionID", sessionID.String()).Msgf("Failed to register udp session")
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
defer q.sessionManager.UnregisterSession(q.session.Context(), sessionID)
|
||||
if err := session.Serve(q.session.Context(), closeAfterIdleHint); err != nil {
|
||||
q.logger.Debug().Err(err).Str("sessionID", sessionID.String()).Msg("session terminated")
|
||||
}
|
||||
}()
|
||||
|
||||
go q.serveUDPSession(session, closeAfterIdleHint)
|
||||
|
||||
q.logger.Debug().Msgf("Registered session %v, %v, %v", sessionID, dstIP, dstPort)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *QUICConnection) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID) error {
|
||||
q.sessionManager.UnregisterSession(ctx, sessionID)
|
||||
return nil
|
||||
func (q *QUICConnection) serveUDPSession(session *datagramsession.Session, closeAfterIdleHint time.Duration) {
|
||||
ctx := q.session.Context()
|
||||
closedByRemote, err := session.Serve(ctx, closeAfterIdleHint)
|
||||
// If session is terminated by remote, then we know it has been unregistered from session manager and edge
|
||||
if !closedByRemote {
|
||||
if err != nil {
|
||||
q.closeUDPSession(ctx, session.ID, err.Error())
|
||||
} else {
|
||||
q.closeUDPSession(ctx, session.ID, "terminated without error")
|
||||
}
|
||||
q.logger.Debug().Err(err).Str("sessionID", session.ID.String()).Msg("session terminated")
|
||||
return
|
||||
}
|
||||
q.logger.Debug().Err(err).Msg("Session terminated by edge")
|
||||
}
|
||||
|
||||
// closeUDPSession first unregisters the session from session manager, then it tries to unregister from edge
|
||||
func (q *QUICConnection) closeUDPSession(ctx context.Context, sessionID uuid.UUID, message string) {
|
||||
q.sessionManager.UnregisterSession(ctx, sessionID, message, false)
|
||||
stream, err := q.session.OpenStream()
|
||||
if err != nil {
|
||||
// Log this at debug because this is not an error if session was closed due to lost connection
|
||||
// with edge
|
||||
q.logger.Debug().Err(err).Str("sessionID", sessionID.String()).
|
||||
Msgf("Failed to open quic stream to unregister udp session with edge")
|
||||
return
|
||||
}
|
||||
rpcClientStream, err := quicpogs.NewRPCClientStream(ctx, stream, q.logger)
|
||||
if err != nil {
|
||||
// Log this at debug because this is not an error if session was closed due to lost connection
|
||||
// with edge
|
||||
q.logger.Err(err).Str("sessionID", sessionID.String()).
|
||||
Msgf("Failed to open rpc stream to unregister udp session with edge")
|
||||
return
|
||||
}
|
||||
if err := rpcClientStream.UnregisterUdpSession(ctx, sessionID, message); err != nil {
|
||||
q.logger.Err(err).Str("sessionID", sessionID.String()).
|
||||
Msgf("Failed to unregister udp session with edge")
|
||||
}
|
||||
}
|
||||
|
||||
// UnregisterUdpSession is the RPC method invoked by edge to unregister and terminate a sesssion
|
||||
func (q *QUICConnection) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, message string) error {
|
||||
return q.sessionManager.UnregisterSession(ctx, sessionID, message, true)
|
||||
}
|
||||
|
||||
// streamReadWriteAcker is a light wrapper over QUIC streams with a callback to send response back to
|
||||
|
@@ -17,29 +17,32 @@ import (
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gobwas/ws/wsutil"
|
||||
"github.com/google/uuid"
|
||||
"github.com/lucas-clemente/quic-go"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/cloudflare/cloudflared/datagramsession"
|
||||
quicpogs "github.com/cloudflare/cloudflared/quic"
|
||||
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
|
||||
)
|
||||
|
||||
var (
|
||||
testTLSServerConfig = generateTLSConfig()
|
||||
testQUICConfig = &quic.Config{
|
||||
KeepAlive: true,
|
||||
EnableDatagrams: true,
|
||||
}
|
||||
)
|
||||
|
||||
// TestQUICServer tests if a quic server accepts and responds to a quic client with the acceptance protocol.
|
||||
// It also serves as a demonstration for communication with the QUIC connection started by a cloudflared.
|
||||
func TestQUICServer(t *testing.T) {
|
||||
quicConfig := &quic.Config{
|
||||
KeepAlive: true,
|
||||
EnableDatagrams: true,
|
||||
}
|
||||
|
||||
// Setup test.
|
||||
log := zerolog.New(os.Stdout)
|
||||
|
||||
// Start a UDP Listener for QUIC.
|
||||
udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
@@ -47,18 +50,6 @@ func TestQUICServer(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
defer udpListener.Close()
|
||||
|
||||
// Create a simple tls config.
|
||||
tlsConfig := generateTLSConfig()
|
||||
|
||||
// Create a client config
|
||||
tlsClientConfig := &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
NextProtos: []string{"argotunnel"},
|
||||
}
|
||||
|
||||
// Start a mock httpProxy
|
||||
originProxy := &mockOriginProxyWithRequest{}
|
||||
|
||||
// This is simply a sample websocket frame message.
|
||||
wsBuf := &bytes.Buffer{}
|
||||
wsutil.WriteClientText(wsBuf, []byte("Hello"))
|
||||
@@ -158,25 +149,13 @@ func TestQUICServer(t *testing.T) {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
quicServer(
|
||||
t, udpListener, tlsConfig, quicConfig,
|
||||
t, udpListener, testTLSServerConfig, testQUICConfig,
|
||||
test.dest, test.connectionType, test.metadata, test.message, test.expectedResponse,
|
||||
)
|
||||
}()
|
||||
|
||||
controlStream := fakeControlStream{}
|
||||
|
||||
qC, err := NewQUICConnection(
|
||||
ctx,
|
||||
quicConfig,
|
||||
udpListener.LocalAddr(),
|
||||
tlsClientConfig,
|
||||
originProxy,
|
||||
&tunnelpogs.ConnectionOptions{},
|
||||
controlStream,
|
||||
NewObserver(&log, &log, false),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
go qC.Serve(ctx)
|
||||
qc := testQUICConnection(ctx, udpListener.LocalAddr(), t)
|
||||
go qc.Serve(ctx)
|
||||
|
||||
wg.Wait()
|
||||
cancel()
|
||||
@@ -531,3 +510,159 @@ func (moc *mockOriginProxyWithRequest) ProxyTCP(ctx context.Context, rwa ReadWri
|
||||
io.Copy(rwa, rwa)
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestServeUDPSession(t *testing.T) {
|
||||
// Start a UDP Listener for QUIC.
|
||||
udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
udpListener, err := net.ListenUDP(udpAddr.Network(), udpAddr)
|
||||
require.NoError(t, err)
|
||||
defer udpListener.Close()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
// Establish QUIC connection with edge
|
||||
edgeQUICSessionChan := make(chan quic.Session)
|
||||
go func() {
|
||||
earlyListener, err := quic.Listen(udpListener, testTLSServerConfig, testQUICConfig)
|
||||
require.NoError(t, err)
|
||||
|
||||
edgeQUICSession, err := earlyListener.Accept(ctx)
|
||||
require.NoError(t, err)
|
||||
edgeQUICSessionChan <- edgeQUICSession
|
||||
}()
|
||||
|
||||
qc := testQUICConnection(ctx, udpListener.LocalAddr(), t)
|
||||
go qc.Serve(ctx)
|
||||
|
||||
edgeQUICSession := <-edgeQUICSessionChan
|
||||
serveSession(ctx, qc, edgeQUICSession, closedByOrigin, io.EOF.Error(), t)
|
||||
serveSession(ctx, qc, edgeQUICSession, closedByTimeout, datagramsession.SessionIdleErr(time.Millisecond*50).Error(), t)
|
||||
serveSession(ctx, qc, edgeQUICSession, closedByRemote, "eyeball closed connection", t)
|
||||
cancel()
|
||||
}
|
||||
|
||||
func serveSession(ctx context.Context, qc *QUICConnection, edgeQUICSession quic.Session, closeType closeReason, expectedReason string, t *testing.T) {
|
||||
var (
|
||||
payload = []byte(t.Name())
|
||||
)
|
||||
sessionID := uuid.New()
|
||||
cfdConn, originConn := net.Pipe()
|
||||
// Registers and run a new session
|
||||
session, err := qc.sessionManager.RegisterSession(ctx, sessionID, cfdConn)
|
||||
require.NoError(t, err)
|
||||
|
||||
sessionDone := make(chan struct{})
|
||||
go func() {
|
||||
qc.serveUDPSession(session, time.Millisecond*50)
|
||||
close(sessionDone)
|
||||
}()
|
||||
|
||||
// Send a message to the quic session on edge side, it should be deumx to this datagram session
|
||||
muxedPayload, err := quicpogs.SuffixSessionID(sessionID, payload)
|
||||
require.NoError(t, err)
|
||||
err = edgeQUICSession.SendMessage(muxedPayload)
|
||||
require.NoError(t, err)
|
||||
|
||||
readBuffer := make([]byte, len(payload)+1)
|
||||
n, err := originConn.Read(readBuffer)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(payload), n)
|
||||
require.True(t, bytes.Equal(payload, readBuffer[:n]))
|
||||
|
||||
// Close connection to terminate session
|
||||
switch closeType {
|
||||
case closedByOrigin:
|
||||
originConn.Close()
|
||||
case closedByRemote:
|
||||
err = qc.UnregisterUdpSession(ctx, sessionID, expectedReason)
|
||||
require.NoError(t, err)
|
||||
case closedByTimeout:
|
||||
}
|
||||
|
||||
if closeType != closedByRemote {
|
||||
// Session was not closed by remote, so closeUDPSession should be invoked to unregister from remote
|
||||
unregisterFromEdgeChan := make(chan struct{})
|
||||
rpcServer := &mockSessionRPCServer{
|
||||
sessionID: sessionID,
|
||||
unregisterReason: expectedReason,
|
||||
calledUnregisterChan: unregisterFromEdgeChan,
|
||||
}
|
||||
go runMockSessionRPCServer(ctx, edgeQUICSession, rpcServer, t)
|
||||
|
||||
<-unregisterFromEdgeChan
|
||||
}
|
||||
|
||||
<-sessionDone
|
||||
}
|
||||
|
||||
type closeReason uint8
|
||||
|
||||
const (
|
||||
closedByOrigin closeReason = iota
|
||||
closedByRemote
|
||||
closedByTimeout
|
||||
)
|
||||
|
||||
func runMockSessionRPCServer(ctx context.Context, session quic.Session, rpcServer *mockSessionRPCServer, t *testing.T) {
|
||||
stream, err := session.AcceptStream(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
if stream.StreamID() == 0 {
|
||||
// Skip the first stream, it's the control stream of the QUIC connection
|
||||
stream, err = session.AcceptStream(ctx)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
protocol, err := quicpogs.DetermineProtocol(stream)
|
||||
assert.NoError(t, err)
|
||||
rpcServerStream, err := quicpogs.NewRPCServerStream(stream, protocol)
|
||||
assert.NoError(t, err)
|
||||
|
||||
log := zerolog.New(os.Stdout)
|
||||
err = rpcServerStream.Serve(rpcServer, &log)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
type mockSessionRPCServer struct {
|
||||
sessionID uuid.UUID
|
||||
unregisterReason string
|
||||
calledUnregisterChan chan struct{}
|
||||
}
|
||||
|
||||
func (s mockSessionRPCServer) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeIdleAfter time.Duration) error {
|
||||
return fmt.Errorf("mockSessionRPCServer doesn't implement RegisterUdpSession")
|
||||
}
|
||||
|
||||
func (s mockSessionRPCServer) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, reason string) error {
|
||||
if s.sessionID != sessionID {
|
||||
return fmt.Errorf("expect session ID %s, got %s", s.sessionID, sessionID)
|
||||
}
|
||||
if s.unregisterReason != reason {
|
||||
return fmt.Errorf("expect unregister reason %s, got %s", s.unregisterReason, reason)
|
||||
}
|
||||
close(s.calledUnregisterChan)
|
||||
fmt.Println("unregister from edge")
|
||||
return nil
|
||||
}
|
||||
|
||||
func testQUICConnection(ctx context.Context, udpListenerAddr net.Addr, t *testing.T) *QUICConnection {
|
||||
tlsClientConfig := &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
NextProtos: []string{"argotunnel"},
|
||||
}
|
||||
// Start a mock httpProxy
|
||||
originProxy := &mockOriginProxyWithRequest{}
|
||||
log := zerolog.New(os.Stdout)
|
||||
qc, err := NewQUICConnection(
|
||||
ctx,
|
||||
testQUICConfig,
|
||||
udpListenerAddr,
|
||||
tlsClientConfig,
|
||||
originProxy,
|
||||
&tunnelpogs.ConnectionOptions{},
|
||||
fakeControlStream{},
|
||||
NewObserver(&log, &log, false),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
return qc
|
||||
}
|
||||
|
Reference in New Issue
Block a user