cloudflared/connection/control.go
GoncaloGarcia e251a21810 TUN-8621: Prevent QUIC connection from closing before grace period after unregistering
Whenever cloudflared receives a SIGTERM or SIGINT it goes into graceful shutdown mode, which unregisters the connection and closes the control stream. Unregistering makes it so we no longer receive any new requests and makes the edge close the connection, allowing in-flight requests to finish (within a 3 minute period).
 This was working fine for http2 connections, but the quic proxy was cancelling the context as soon as the controls stream ended, forcing the process to stop immediately.

 This commit changes the behavior so that we wait the full grace period before cancelling the request
2024-10-07 10:51:21 -05:00

152 lines
4.9 KiB
Go

package connection
import (
"context"
"io"
"net"
"time"
"github.com/pkg/errors"
"github.com/cloudflare/cloudflared/management"
"github.com/cloudflare/cloudflared/tunnelrpc"
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
)
// registerClient derives a named tunnel rpc client that can then be used to register and unregister connections.
type registerClientFunc func(context.Context, io.ReadWriteCloser, time.Duration) tunnelrpc.RegistrationClient
type controlStream struct {
observer *Observer
connectedFuse ConnectedFuse
tunnelProperties *TunnelProperties
connIndex uint8
edgeAddress net.IP
protocol Protocol
registerClientFunc registerClientFunc
registerTimeout time.Duration
gracefulShutdownC <-chan struct{}
gracePeriod time.Duration
stoppedGracefully bool
}
// ControlStreamHandler registers connections with origintunneld and initiates graceful shutdown.
type ControlStreamHandler interface {
// ServeControlStream handles the control plane of the transport in the current goroutine calling this
ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions, tunnelConfigGetter TunnelConfigJSONGetter) error
// IsStopped tells whether the method above has finished
IsStopped() bool
}
type TunnelConfigJSONGetter interface {
GetConfigJSON() ([]byte, error)
}
// NewControlStream returns a new instance of ControlStreamHandler
func NewControlStream(
observer *Observer,
connectedFuse ConnectedFuse,
tunnelProperties *TunnelProperties,
connIndex uint8,
edgeAddress net.IP,
registerClientFunc registerClientFunc,
registerTimeout time.Duration,
gracefulShutdownC <-chan struct{},
gracePeriod time.Duration,
protocol Protocol,
) ControlStreamHandler {
if registerClientFunc == nil {
registerClientFunc = tunnelrpc.NewRegistrationClient
}
return &controlStream{
observer: observer,
connectedFuse: connectedFuse,
tunnelProperties: tunnelProperties,
registerClientFunc: registerClientFunc,
registerTimeout: registerTimeout,
connIndex: connIndex,
edgeAddress: edgeAddress,
gracefulShutdownC: gracefulShutdownC,
gracePeriod: gracePeriod,
protocol: protocol,
}
}
func (c *controlStream) ServeControlStream(
ctx context.Context,
rw io.ReadWriteCloser,
connOptions *tunnelpogs.ConnectionOptions,
tunnelConfigGetter TunnelConfigJSONGetter,
) error {
registrationClient := c.registerClientFunc(ctx, rw, c.registerTimeout)
registrationDetails, err := registrationClient.RegisterConnection(
ctx,
c.tunnelProperties.Credentials.Auth(),
c.tunnelProperties.Credentials.TunnelID,
connOptions,
c.connIndex,
c.edgeAddress)
if err != nil {
defer registrationClient.Close()
if err.Error() == DuplicateConnectionError {
c.observer.metrics.regFail.WithLabelValues("dup_edge_conn", "registerConnection").Inc()
return errDuplicationConnection
}
c.observer.metrics.regFail.WithLabelValues("server_error", "registerConnection").Inc()
return serverRegistrationErrorFromRPC(err)
}
c.observer.metrics.regSuccess.WithLabelValues("registerConnection").Inc()
c.observer.logConnected(registrationDetails.UUID, c.connIndex, registrationDetails.Location, c.edgeAddress, c.protocol)
c.observer.sendConnectedEvent(c.connIndex, c.protocol, registrationDetails.Location)
c.connectedFuse.Connected()
// if conn index is 0 and tunnel is not remotely managed, then send local ingress rules configuration
if c.connIndex == 0 && !registrationDetails.TunnelIsRemotelyManaged {
if tunnelConfig, err := tunnelConfigGetter.GetConfigJSON(); err == nil {
if err := registrationClient.SendLocalConfiguration(ctx, tunnelConfig); err != nil {
c.observer.metrics.localConfigMetrics.pushesErrors.Inc()
c.observer.log.Err(err).Msg("unable to send local configuration")
}
c.observer.metrics.localConfigMetrics.pushes.Inc()
} else {
c.observer.log.Err(err).Msg("failed to obtain current configuration")
}
}
return c.waitForUnregister(ctx, registrationClient)
}
func (c *controlStream) waitForUnregister(ctx context.Context, registrationClient tunnelrpc.RegistrationClient) error {
// wait for connection termination or start of graceful shutdown
defer registrationClient.Close()
var shutdownError error
select {
case <-ctx.Done():
shutdownError = ctx.Err()
break
case <-c.gracefulShutdownC:
c.stoppedGracefully = true
}
c.observer.sendUnregisteringEvent(c.connIndex)
err := registrationClient.GracefulShutdown(ctx, c.gracePeriod)
if err != nil {
return errors.Wrap(err, "Error shutting down control stream")
}
c.observer.log.Info().
Int(management.EventTypeKey, int(management.Cloudflared)).
Uint8(LogFieldConnIndex, c.connIndex).
IPAddr(LogFieldIPAddress, c.edgeAddress).
Msg("Unregistered tunnel connection")
return shutdownError
}
func (c *controlStream) IsStopped() bool {
return c.stoppedGracefully
}