mirror of
https://github.com/cloudflare/cloudflared.git
synced 2025-05-12 04:46:34 +00:00

UDP session would check if the socket was closed before returning but the net.ErrClosed could be wrapped in another error.
139 lines
4.2 KiB
Go
139 lines
4.2 KiB
Go
package datagramsession
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/rs/zerolog"
|
|
|
|
"github.com/cloudflare/cloudflared/packet"
|
|
)
|
|
|
|
const (
|
|
defaultCloseIdleAfter = time.Second * 210
|
|
)
|
|
|
|
func SessionIdleErr(timeout time.Duration) error {
|
|
return fmt.Errorf("session idle for %v", timeout)
|
|
}
|
|
|
|
type transportSender func(session *packet.Session) error
|
|
|
|
// Session is a bidirectional pipe of datagrams between transport and dstConn
|
|
// Destination can be a connection with origin or with eyeball
|
|
// When the destination is origin:
|
|
// - Manager receives datagrams from receiveChan and calls the transportToDst method of the Session to send to origin
|
|
// - Datagrams from origin are read from conn and Send to transport using the transportSender callback. Transport will return them to eyeball
|
|
// When the destination is eyeball:
|
|
// - Datagrams from eyeball are read from conn and Send to transport. Transport will send them to cloudflared using the transportSender callback.
|
|
// - Manager receives datagrams from receiveChan and calls the transportToDst method of the Session to send to the eyeball
|
|
type Session struct {
|
|
ID uuid.UUID
|
|
sendFunc transportSender
|
|
dstConn io.ReadWriteCloser
|
|
// activeAtChan is used to communicate the last read/write time
|
|
activeAtChan chan time.Time
|
|
closeChan chan error
|
|
log *zerolog.Logger
|
|
}
|
|
|
|
func (s *Session) Serve(ctx context.Context, closeAfterIdle time.Duration) (closedByRemote bool, err error) {
|
|
go func() {
|
|
// QUIC implementation copies data to another buffer before returning https://github.com/lucas-clemente/quic-go/blob/v0.24.0/session.go#L1967-L1975
|
|
// This makes it safe to share readBuffer between iterations
|
|
const maxPacketSize = 1500
|
|
readBuffer := make([]byte, maxPacketSize)
|
|
for {
|
|
if closeSession, err := s.dstToTransport(readBuffer); err != nil {
|
|
if errors.Is(err, net.ErrClosed) {
|
|
s.log.Debug().Msg("Destination connection closed")
|
|
} else {
|
|
s.log.Error().Err(err).Msg("Failed to send session payload from destination to transport")
|
|
}
|
|
if closeSession {
|
|
s.closeChan <- err
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
err = s.waitForCloseCondition(ctx, closeAfterIdle)
|
|
if closeSession, ok := err.(*errClosedSession); ok {
|
|
closedByRemote = closeSession.byRemote
|
|
}
|
|
return closedByRemote, err
|
|
}
|
|
|
|
func (s *Session) waitForCloseCondition(ctx context.Context, closeAfterIdle time.Duration) error {
|
|
// Closing dstConn cancels read so dstToTransport routine in Serve() can return
|
|
defer s.dstConn.Close()
|
|
if closeAfterIdle == 0 {
|
|
// provide deafult is caller doesn't specify one
|
|
closeAfterIdle = defaultCloseIdleAfter
|
|
}
|
|
|
|
checkIdleFreq := closeAfterIdle / 8
|
|
checkIdleTicker := time.NewTicker(checkIdleFreq)
|
|
defer checkIdleTicker.Stop()
|
|
|
|
activeAt := time.Now()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case reason := <-s.closeChan:
|
|
return reason
|
|
// TODO: TUN-5423 evaluate if using atomic is more efficient
|
|
case now := <-checkIdleTicker.C:
|
|
// The session is considered inactive if current time is after (last active time + allowed idle time)
|
|
if now.After(activeAt.Add(closeAfterIdle)) {
|
|
return SessionIdleErr(closeAfterIdle)
|
|
}
|
|
case activeAt = <-s.activeAtChan: // Update last active time
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Session) dstToTransport(buffer []byte) (closeSession bool, err error) {
|
|
n, err := s.dstConn.Read(buffer)
|
|
s.markActive()
|
|
// https://pkg.go.dev/io#Reader suggests caller should always process n > 0 bytes
|
|
if n > 0 || err == nil {
|
|
session := packet.Session{
|
|
ID: s.ID,
|
|
Payload: buffer[:n],
|
|
}
|
|
if sendErr := s.sendFunc(&session); sendErr != nil {
|
|
return false, sendErr
|
|
}
|
|
}
|
|
return err != nil, err
|
|
}
|
|
|
|
func (s *Session) transportToDst(payload []byte) (int, error) {
|
|
s.markActive()
|
|
n, err := s.dstConn.Write(payload)
|
|
if err != nil {
|
|
s.log.Err(err).Msg("Failed to write payload to session")
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
// Sends the last active time to the idle checker loop without blocking. activeAtChan will only be full when there
|
|
// are many concurrent read/write. It is fine to lose some precision
|
|
func (s *Session) markActive() {
|
|
select {
|
|
case s.activeAtChan <- time.Now():
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (s *Session) close(err *errClosedSession) {
|
|
s.closeChan <- err
|
|
}
|