mirror of
https://github.com/cloudflare/cloudflared.git
synced 2025-07-27 17:19:58 +00:00
Reverts the following:
Revert "TUN-8621: Fix cloudflared version in change notes." Revert "PPIP-2310: Update quick tunnel disclaimer" Revert "TUN-8621: Prevent QUIC connection from closing before grace period after unregistering" Revert "TUN-8484: Print response when QuickTunnel can't be unmarshalled" Revert "TUN-8592: Use metadata from the edge to determine if request body is empty for QUIC transport"
This commit is contained in:
@@ -6,8 +6,6 @@ import (
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/cloudflare/cloudflared/management"
|
||||
"github.com/cloudflare/cloudflared/tunnelrpc"
|
||||
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
|
||||
@@ -118,32 +116,27 @@ func (c *controlStream) ServeControlStream(
|
||||
}
|
||||
}
|
||||
|
||||
return c.waitForUnregister(ctx, registrationClient)
|
||||
c.waitForUnregister(ctx, registrationClient)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *controlStream) waitForUnregister(ctx context.Context, registrationClient tunnelrpc.RegistrationClient) error {
|
||||
func (c *controlStream) waitForUnregister(ctx context.Context, registrationClient tunnelrpc.RegistrationClient) {
|
||||
// wait for connection termination or start of graceful shutdown
|
||||
defer registrationClient.Close()
|
||||
var shutdownError error
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
shutdownError = ctx.Err()
|
||||
break
|
||||
case <-c.gracefulShutdownC:
|
||||
c.stoppedGracefully = true
|
||||
}
|
||||
|
||||
c.observer.sendUnregisteringEvent(c.connIndex)
|
||||
err := registrationClient.GracefulShutdown(ctx, c.gracePeriod)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Error shutting down control stream")
|
||||
}
|
||||
registrationClient.GracefulShutdown(ctx, c.gracePeriod)
|
||||
c.observer.log.Info().
|
||||
Int(management.EventTypeKey, int(management.Cloudflared)).
|
||||
Uint8(LogFieldConnIndex, c.connIndex).
|
||||
IPAddr(LogFieldIPAddress, c.edgeAddress).
|
||||
Msg("Unregistered tunnel connection")
|
||||
return shutdownError
|
||||
}
|
||||
|
||||
func (c *controlStream) IsStopped() bool {
|
||||
|
@@ -192,9 +192,8 @@ func (mc mockNamedTunnelRPCClient) RegisterConnection(
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (mc mockNamedTunnelRPCClient) GracefulShutdown(ctx context.Context, gracePeriod time.Duration) error {
|
||||
func (mc mockNamedTunnelRPCClient) GracefulShutdown(ctx context.Context, gracePeriod time.Duration) {
|
||||
close(mc.unregistered)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mockNamedTunnelRPCClient) Close() {}
|
||||
|
@@ -42,26 +42,12 @@ const (
|
||||
HTTPMethodKey = "HttpMethod"
|
||||
// HTTPHostKey is used to get or set http Method in QUIC ALPN if the underlying proxy connection type is HTTP.
|
||||
HTTPHostKey = "HttpHost"
|
||||
// HTTPRequestBodyHintKey is used in ConnectRequest metadata to indicate if the request has body
|
||||
HTTPRequestBodyHintKey = "HttpReqBodyHint"
|
||||
|
||||
QUICMetadataFlowID = "FlowID"
|
||||
// emperically this capacity has been working well
|
||||
demuxChanCapacity = 16
|
||||
)
|
||||
|
||||
type RequestBodyHint uint64
|
||||
|
||||
const (
|
||||
RequestBodyHintMissing RequestBodyHint = iota
|
||||
RequestBodyHintEmpty
|
||||
RequestBodyHintHasData
|
||||
)
|
||||
|
||||
func (rbh RequestBodyHint) String() string {
|
||||
return [...]string{"missing", "empty", "data"}[rbh]
|
||||
}
|
||||
|
||||
var (
|
||||
portForConnIndex = make(map[uint8]int, 0)
|
||||
portMapMutex sync.Mutex
|
||||
@@ -83,7 +69,6 @@ type QUICConnection struct {
|
||||
|
||||
rpcTimeout time.Duration
|
||||
streamWriteTimeout time.Duration
|
||||
gracePeriod time.Duration
|
||||
}
|
||||
|
||||
// NewQUICConnection returns a new instance of QUICConnection.
|
||||
@@ -101,7 +86,6 @@ func NewQUICConnection(
|
||||
packetRouterConfig *ingress.GlobalRouterConfig,
|
||||
rpcTimeout time.Duration,
|
||||
streamWriteTimeout time.Duration,
|
||||
gracePeriod time.Duration,
|
||||
) (*QUICConnection, error) {
|
||||
udpConn, err := createUDPConnForConnIndex(connIndex, localAddr, logger)
|
||||
if err != nil {
|
||||
@@ -138,7 +122,6 @@ func NewQUICConnection(
|
||||
connIndex: connIndex,
|
||||
rpcTimeout: rpcTimeout,
|
||||
streamWriteTimeout: streamWriteTimeout,
|
||||
gracePeriod: gracePeriod,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -161,17 +144,8 @@ func (q *QUICConnection) Serve(ctx context.Context) error {
|
||||
// In the future, if cloudflared can autonomously push traffic to the edge, we have to make sure the control
|
||||
// stream is already fully registered before the other goroutines can proceed.
|
||||
errGroup.Go(func() error {
|
||||
// err is equal to nil if we exit due to unregistration. If that happens we want to wait the full
|
||||
// amount of the grace period, allowing requests to finish before we cancel the context, which will
|
||||
// make cloudflared exit.
|
||||
if err := q.serveControlStream(ctx, controlStream); err == nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-time.Tick(q.gracePeriod):
|
||||
}
|
||||
}
|
||||
cancel()
|
||||
return err
|
||||
defer cancel()
|
||||
return q.serveControlStream(ctx, controlStream)
|
||||
})
|
||||
errGroup.Go(func() error {
|
||||
defer cancel()
|
||||
@@ -500,6 +474,7 @@ func buildHTTPRequest(
|
||||
dest := connectRequest.Dest
|
||||
method := metadata[HTTPMethodKey]
|
||||
host := metadata[HTTPHostKey]
|
||||
isWebsocket := connectRequest.Type == pogs.ConnectionTypeWebsocket
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, method, dest, body)
|
||||
if err != nil {
|
||||
@@ -524,8 +499,13 @@ func buildHTTPRequest(
|
||||
return nil, fmt.Errorf("Error setting content-length: %w", err)
|
||||
}
|
||||
|
||||
if shouldSetRequestBodyToEmpty(connectRequest, metadata, req) {
|
||||
log.Debug().Str("host", req.Host).Str("method", req.Method).Msg("Set request to have no body")
|
||||
// Go's client defaults to chunked encoding after a 200ms delay if the following cases are true:
|
||||
// * the request body blocks
|
||||
// * the content length is not set (or set to -1)
|
||||
// * the method doesn't usually have a body (GET, HEAD, DELETE, ...)
|
||||
// * there is no transfer-encoding=chunked already set.
|
||||
// So, if transfer cannot be chunked and content length is 0, we dont set a request body.
|
||||
if !isWebsocket && !isTransferEncodingChunked(req) && req.ContentLength == 0 {
|
||||
req.Body = http.NoBody
|
||||
}
|
||||
stripWebsocketUpgradeHeader(req)
|
||||
@@ -550,35 +530,6 @@ func isTransferEncodingChunked(req *http.Request) bool {
|
||||
return strings.Contains(strings.ToLower(transferEncodingVal), "chunked")
|
||||
}
|
||||
|
||||
// Borrowed from https://github.com/golang/go/blob/go1.22.6/src/net/http/request.go#L1541
|
||||
func requestMethodUsuallyLacksBody(req *http.Request) bool {
|
||||
switch strings.ToUpper(req.Method) {
|
||||
case "GET", "HEAD", "DELETE", "OPTIONS", "PROPFIND", "SEARCH":
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func shouldSetRequestBodyToEmpty(connectRequest *pogs.ConnectRequest, metadata map[string]string, req *http.Request) bool {
|
||||
switch metadata[HTTPRequestBodyHintKey] {
|
||||
case RequestBodyHintEmpty.String():
|
||||
return true
|
||||
case RequestBodyHintHasData.String():
|
||||
return false
|
||||
default:
|
||||
}
|
||||
|
||||
isWebsocket := connectRequest.Type == pogs.ConnectionTypeWebsocket
|
||||
// Go's client defaults to chunked encoding after a 200ms delay if the following cases are true:
|
||||
// * the request body blocks
|
||||
// * the content length is not set (or set to -1)
|
||||
// * the method doesn't usually have a body (GET, HEAD, DELETE, ...)
|
||||
// * there is no transfer-encoding=chunked already set.
|
||||
// So, if transfer cannot be chunked and content length is 0, we dont set a request body.
|
||||
// Reference: https://github.com/golang/go/blob/go1.22.2/src/net/http/transfer.go#L192-L206
|
||||
return !isWebsocket && requestMethodUsuallyLacksBody(req) && !isTransferEncodingChunked(req) && req.ContentLength == 0
|
||||
}
|
||||
|
||||
// A helper struct that guarantees a call to close only affects read side, but not write side.
|
||||
type nopCloserReadWriter struct {
|
||||
io.ReadWriteCloser
|
||||
|
@@ -484,125 +484,6 @@ func TestBuildHTTPRequest(t *testing.T) {
|
||||
},
|
||||
body: io.NopCloser(&bytes.Buffer{}),
|
||||
},
|
||||
{
|
||||
name: "if edge sends the body is empty hint, set body to empty",
|
||||
connectRequest: &pogs.ConnectRequest{
|
||||
Dest: "http://test.com",
|
||||
Metadata: []pogs.Metadata{
|
||||
{
|
||||
Key: "HttpHeader:Another-Header",
|
||||
Val: "Misc",
|
||||
},
|
||||
{
|
||||
Key: "HttpHost",
|
||||
Val: "cf.host",
|
||||
},
|
||||
{
|
||||
Key: "HttpMethod",
|
||||
Val: "put",
|
||||
},
|
||||
{
|
||||
Key: HTTPRequestBodyHintKey,
|
||||
Val: RequestBodyHintEmpty.String(),
|
||||
},
|
||||
},
|
||||
},
|
||||
req: &http.Request{
|
||||
Method: "put",
|
||||
URL: &url.URL{
|
||||
Scheme: "http",
|
||||
Host: "test.com",
|
||||
},
|
||||
Proto: "HTTP/1.1",
|
||||
ProtoMajor: 1,
|
||||
ProtoMinor: 1,
|
||||
Header: http.Header{
|
||||
"Another-Header": []string{"Misc"},
|
||||
},
|
||||
ContentLength: 0,
|
||||
Host: "cf.host",
|
||||
Body: http.NoBody,
|
||||
},
|
||||
body: io.NopCloser(&bytes.Buffer{}),
|
||||
},
|
||||
{
|
||||
name: "if edge sends the body has data hint, don't set body to empty",
|
||||
connectRequest: &pogs.ConnectRequest{
|
||||
Dest: "http://test.com",
|
||||
Metadata: []pogs.Metadata{
|
||||
{
|
||||
Key: "HttpHeader:Another-Header",
|
||||
Val: "Misc",
|
||||
},
|
||||
{
|
||||
Key: "HttpHost",
|
||||
Val: "cf.host",
|
||||
},
|
||||
{
|
||||
Key: "HttpMethod",
|
||||
Val: "put",
|
||||
},
|
||||
{
|
||||
Key: HTTPRequestBodyHintKey,
|
||||
Val: RequestBodyHintHasData.String(),
|
||||
},
|
||||
},
|
||||
},
|
||||
req: &http.Request{
|
||||
Method: "put",
|
||||
URL: &url.URL{
|
||||
Scheme: "http",
|
||||
Host: "test.com",
|
||||
},
|
||||
Proto: "HTTP/1.1",
|
||||
ProtoMajor: 1,
|
||||
ProtoMinor: 1,
|
||||
Header: http.Header{
|
||||
"Another-Header": []string{"Misc"},
|
||||
},
|
||||
ContentLength: 0,
|
||||
Host: "cf.host",
|
||||
Body: io.NopCloser(&bytes.Buffer{}),
|
||||
},
|
||||
body: io.NopCloser(&bytes.Buffer{}),
|
||||
},
|
||||
{
|
||||
name: "if the http method usually has body, don't set body to empty",
|
||||
connectRequest: &pogs.ConnectRequest{
|
||||
Dest: "http://test.com",
|
||||
Metadata: []pogs.Metadata{
|
||||
{
|
||||
Key: "HttpHeader:Another-Header",
|
||||
Val: "Misc",
|
||||
},
|
||||
{
|
||||
Key: "HttpHost",
|
||||
Val: "cf.host",
|
||||
},
|
||||
{
|
||||
Key: "HttpMethod",
|
||||
Val: "post",
|
||||
},
|
||||
},
|
||||
},
|
||||
req: &http.Request{
|
||||
Method: "post",
|
||||
URL: &url.URL{
|
||||
Scheme: "http",
|
||||
Host: "test.com",
|
||||
},
|
||||
Proto: "HTTP/1.1",
|
||||
ProtoMajor: 1,
|
||||
ProtoMinor: 1,
|
||||
Header: http.Header{
|
||||
"Another-Header": []string{"Misc"},
|
||||
},
|
||||
ContentLength: 0,
|
||||
Host: "cf.host",
|
||||
Body: io.NopCloser(&bytes.Buffer{}),
|
||||
},
|
||||
body: io.NopCloser(&bytes.Buffer{}),
|
||||
},
|
||||
}
|
||||
|
||||
log := zerolog.Nop()
|
||||
@@ -855,7 +736,6 @@ func testQUICConnection(udpListenerAddr net.Addr, t *testing.T, index uint8) *QU
|
||||
nil,
|
||||
15*time.Second,
|
||||
0*time.Second,
|
||||
0*time.Second,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
return qc
|
||||
|
Reference in New Issue
Block a user