TUN-6666: Define packet package

This package defines IP and ICMP packet, decoders, encoder and flow
This commit is contained in:
cthuang
2022-08-17 16:46:49 +01:00
parent 20ed7557f9
commit bad2e8e812
242 changed files with 49761 additions and 2642 deletions

View File

@@ -8,21 +8,18 @@ import (
"github.com/lucas-clemente/quic-go"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/cloudflare/cloudflared/packet"
)
const (
sessionIDLen = len(uuid.UUID{})
)
type SessionDatagram struct {
ID uuid.UUID
Payload []byte
}
type BaseDatagramMuxer interface {
// MuxSession suffix the session ID to the payload so the other end of the QUIC connection can demultiplex the
// payload from multiple datagram sessions
MuxSession(sessionID uuid.UUID, payload []byte) error
// SendToSession suffix the session ID to the payload so the other end of the QUIC connection can demultiplex the
// payload from multiple datagram sessions.
SendToSession(session *packet.Session) error
// ServeReceive starts a loop to receive datagrams from the QUIC connection
ServeReceive(ctx context.Context) error
}
@@ -30,10 +27,10 @@ type BaseDatagramMuxer interface {
type DatagramMuxer struct {
session quic.Connection
logger *zerolog.Logger
demuxChan chan<- *SessionDatagram
demuxChan chan<- *packet.Session
}
func NewDatagramMuxer(quicSession quic.Connection, log *zerolog.Logger, demuxChan chan<- *SessionDatagram) *DatagramMuxer {
func NewDatagramMuxer(quicSession quic.Connection, log *zerolog.Logger, demuxChan chan<- *packet.Session) *DatagramMuxer {
logger := log.With().Uint8("datagramVersion", 1).Logger()
return &DatagramMuxer{
session: quicSession,
@@ -47,13 +44,13 @@ func (dm *DatagramMuxer) mtu() int {
return maxDatagramPayloadSize
}
func (dm *DatagramMuxer) MuxSession(sessionID uuid.UUID, payload []byte) error {
if len(payload) > dm.mtu() {
func (dm *DatagramMuxer) SendToSession(session *packet.Session) error {
if len(session.Payload) > dm.mtu() {
// TODO: TUN-5302 return ICMP packet too big message
// drop packet for now, eventually reply with ICMP for PMTUD
return fmt.Errorf("origin UDP payload has %d bytes, which exceeds transport MTU %d", len(payload), dm.mtu())
return fmt.Errorf("origin UDP payload has %d bytes, which exceeds transport MTU %d", len(session.Payload), dm.mtu())
}
payloadWithMetadata, err := suffixSessionID(sessionID, payload)
payloadWithMetadata, err := suffixSessionID(session.ID, session.Payload)
if err != nil {
return errors.Wrap(err, "Failed to suffix session ID to datagram, it will be dropped")
}
@@ -86,7 +83,7 @@ func (dm *DatagramMuxer) demux(ctx context.Context, msg []byte) error {
if err != nil {
return err
}
sessionDatagram := SessionDatagram{
sessionDatagram := packet.Session{
ID: sessionID,
Payload: payload,
}

View File

@@ -1,7 +1,6 @@
package quic
import (
"bytes"
"context"
"crypto/rand"
"crypto/rsa"
@@ -18,6 +17,8 @@ import (
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"github.com/cloudflare/cloudflared/packet"
)
var (
@@ -57,7 +58,7 @@ func TestDatagram(t *testing.T) {
maxPayload := make([]byte, maxDatagramPayloadSize)
noPayloadSession := uuid.New()
maxPayloadSession := uuid.New()
sessionToPayload := []*SessionDatagram{
sessionToPayload := []*packet.Session{
{
ID: noPayloadSession,
Payload: make([]byte, 0),
@@ -75,7 +76,7 @@ func TestDatagram(t *testing.T) {
testDatagram(t, 2, sessionToPayload, flowPayloads)
}
func testDatagram(t *testing.T, version uint8, sessionToPayloads []*SessionDatagram, packetPayloads [][]byte) {
func testDatagram(t *testing.T, version uint8, sessionToPayloads []*packet.Session, packetPayloads [][]byte) {
quicConfig := &quic.Config{
KeepAlivePeriod: 5 * time.Millisecond,
EnableDatagrams: true,
@@ -95,7 +96,7 @@ func testDatagram(t *testing.T, version uint8, sessionToPayloads []*SessionDatag
return err
}
sessionDemuxChan := make(chan *SessionDatagram, 16)
sessionDemuxChan := make(chan *packet.Session, 16)
switch version {
case 1:
@@ -151,11 +152,14 @@ func testDatagram(t *testing.T, version uint8, sessionToPayloads []*SessionDatag
return fmt.Errorf("unknown datagram version %d", version)
}
for _, sessionDatagram := range sessionToPayloads {
require.NoError(t, muxer.MuxSession(sessionDatagram.ID, sessionDatagram.Payload))
for _, session := range sessionToPayloads {
require.NoError(t, muxer.SendToSession(session))
}
// Payload larger than transport MTU, should not be sent
require.Error(t, muxer.MuxSession(testSessionID, largePayload))
require.Error(t, muxer.SendToSession(&packet.Session{
ID: testSessionID,
Payload: largePayload,
}))
// Wait for edge to finish receiving the messages
time.Sleep(time.Millisecond * 100)
@@ -198,35 +202,3 @@ func generateTLSConfig() *tls.Config {
NextProtos: []string{"argotunnel"},
}
}
type sessionMuxer interface {
SendToSession(sessionID uuid.UUID, payload []byte) error
}
type mockSessionReceiver struct {
expectedSessionToPayload map[uuid.UUID][]byte
receivedCount int
}
func (msr *mockSessionReceiver) ReceiveDatagram(sessionID uuid.UUID, payload []byte) error {
expectedPayload := msr.expectedSessionToPayload[sessionID]
if !bytes.Equal(expectedPayload, payload) {
return fmt.Errorf("expect %v to have payload %s, got %s", sessionID, string(expectedPayload), string(payload))
}
msr.receivedCount++
return nil
}
type mockFlowReceiver struct {
expectedPayloads [][]byte
receivedCount int
}
func (mfr *mockFlowReceiver) ReceiveFlow(payload []byte) error {
expectedPayload := mfr.expectedPayloads[mfr.receivedCount]
if !bytes.Equal(expectedPayload, payload) {
return fmt.Errorf("expect flow %d to have payload %s, got %s", mfr.receivedCount, string(expectedPayload), string(payload))
}
mfr.receivedCount++
return nil
}

View File

@@ -4,10 +4,11 @@ import (
"context"
"fmt"
"github.com/google/uuid"
"github.com/lucas-clemente/quic-go"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/cloudflare/cloudflared/packet"
)
type datagramV2Type byte
@@ -33,14 +34,14 @@ func (dm *DatagramMuxerV2) mtu() int {
type DatagramMuxerV2 struct {
session quic.Connection
logger *zerolog.Logger
sessionDemuxChan chan<- *SessionDatagram
sessionDemuxChan chan<- *packet.Session
packetDemuxChan chan<- []byte
}
func NewDatagramMuxerV2(
quicSession quic.Connection,
log *zerolog.Logger,
sessionDemuxChan chan<- *SessionDatagram,
sessionDemuxChan chan<- *packet.Session,
packetDemuxChan chan<- []byte) *DatagramMuxerV2 {
logger := log.With().Uint8("datagramVersion", 2).Logger()
return &DatagramMuxerV2{
@@ -53,12 +54,12 @@ func NewDatagramMuxerV2(
// MuxSession suffix the session ID and datagram version to the payload so the other end of the QUIC connection can
// demultiplex the payload from multiple datagram sessions
func (dm *DatagramMuxerV2) MuxSession(sessionID uuid.UUID, payload []byte) error {
if len(payload) > dm.mtu() {
func (dm *DatagramMuxerV2) SendToSession(session *packet.Session) error {
if len(session.Payload) > dm.mtu() {
// TODO: TUN-5302 return ICMP packet too big message
return fmt.Errorf("origin UDP payload has %d bytes, which exceeds transport MTU %d", len(payload), dm.mtu())
return fmt.Errorf("origin UDP payload has %d bytes, which exceeds transport MTU %d", len(session.Payload), dm.mtu())
}
msgWithID, err := suffixSessionID(sessionID, payload)
msgWithID, err := suffixSessionID(session.ID, session.Payload)
if err != nil {
return errors.Wrap(err, "Failed to suffix session ID to datagram, it will be dropped")
}
@@ -113,7 +114,7 @@ func (dm *DatagramMuxerV2) demux(ctx context.Context, msgWithType []byte) error
if err != nil {
return err
}
sessionDatagram := SessionDatagram{
sessionDatagram := packet.Session{
ID: sessionID,
Payload: payload,
}