mirror of
https://github.com/cloudflare/cloudflared.git
synced 2025-04-27 08:56:34 +00:00
TUN-9255: Improve flush on write conditions in http2 tunnel type to match what is done on the edge
## Summary We have adapted our edge services to better know when they should flush on write. This is an important feature to ensure response types like Server Side Events are not buffered, and instead are propagated to the eyeball as soon as possible. This commit implements a similar logic for http2 tunnel protocol that we use in our edge services. By adding the new events stream header for json `application/x-ndjson` and using the content-length and transfer-encoding headers as well, following the RFC's: - https://datatracker.ietf.org/doc/html/rfc7230#section-4.1 - https://datatracker.ietf.org/doc/html/rfc9112#section-6.1 Closes TUN-9255
This commit is contained in:
parent
86e8585563
commit
73a9980f38
@ -26,14 +26,20 @@ const (
|
||||
MaxGracePeriod = time.Minute * 3
|
||||
MaxConcurrentStreams = math.MaxUint32
|
||||
|
||||
contentTypeHeader = "content-type"
|
||||
sseContentType = "text/event-stream"
|
||||
grpcContentType = "application/grpc"
|
||||
contentTypeHeader = "content-type"
|
||||
contentLengthHeader = "content-length"
|
||||
transferEncodingHeader = "transfer-encoding"
|
||||
|
||||
sseContentType = "text/event-stream"
|
||||
grpcContentType = "application/grpc"
|
||||
sseJsonContentType = "application/x-ndjson"
|
||||
|
||||
chunkTransferEncoding = "chunked"
|
||||
)
|
||||
|
||||
var (
|
||||
switchingProtocolText = fmt.Sprintf("%d %s", http.StatusSwitchingProtocols, http.StatusText(http.StatusSwitchingProtocols))
|
||||
flushableContentTypes = []string{sseContentType, grpcContentType}
|
||||
flushableContentTypes = []string{sseContentType, grpcContentType, sseJsonContentType}
|
||||
)
|
||||
|
||||
// TunnelConnection represents the connection to the edge.
|
||||
@ -274,6 +280,22 @@ type ConnectedFuse interface {
|
||||
// Helper method to let the caller know what content-types should require a flush on every
|
||||
// write to a ResponseWriter.
|
||||
func shouldFlush(headers http.Header) bool {
|
||||
// When doing Server Side Events (SSE), some frameworks don't respect the `Content-Type` header.
|
||||
// Therefore, we need to rely on other ways to know whether we should flush on write or not. A good
|
||||
// approach is to assume that responses without `Content-Length` or with `Transfer-Encoding: chunked`
|
||||
// are streams, and therefore, should be flushed right away to the eyeball.
|
||||
// References:
|
||||
// - https://datatracker.ietf.org/doc/html/rfc7230#section-4.1
|
||||
// - https://datatracker.ietf.org/doc/html/rfc9112#section-6.1
|
||||
if contentLength := headers.Get(contentLengthHeader); contentLength == "" {
|
||||
return true
|
||||
}
|
||||
if transferEncoding := headers.Get(transferEncodingHeader); transferEncoding != "" {
|
||||
transferEncoding = strings.ToLower(transferEncoding)
|
||||
if strings.Contains(transferEncoding, chunkTransferEncoding) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
if contentType := headers.Get(contentTypeHeader); contentType != "" {
|
||||
contentType = strings.ToLower(contentType)
|
||||
for _, c := range flushableContentTypes {
|
||||
@ -282,7 +304,6 @@ func shouldFlush(headers http.Header) bool {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -7,10 +7,12 @@ import (
|
||||
"io"
|
||||
"math/big"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
pkgerrors "github.com/pkg/errors"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
cfdflow "github.com/cloudflare/cloudflared/flow"
|
||||
|
||||
@ -209,3 +211,48 @@ func (mcf mockConnectedFuse) Connected() {}
|
||||
func (mcf mockConnectedFuse) IsConnected() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func TestShouldFlushHeaders(t *testing.T) {
|
||||
tests := []struct {
|
||||
headers map[string]string
|
||||
shouldFlush bool
|
||||
}{
|
||||
{
|
||||
headers: map[string]string{contentTypeHeader: "application/json", contentLengthHeader: "1"},
|
||||
shouldFlush: false,
|
||||
},
|
||||
{
|
||||
headers: map[string]string{contentTypeHeader: "text/html", contentLengthHeader: "1"},
|
||||
shouldFlush: false,
|
||||
},
|
||||
{
|
||||
headers: map[string]string{contentTypeHeader: "text/event-stream", contentLengthHeader: "1"},
|
||||
shouldFlush: true,
|
||||
},
|
||||
{
|
||||
headers: map[string]string{contentTypeHeader: "application/grpc", contentLengthHeader: "1"},
|
||||
shouldFlush: true,
|
||||
},
|
||||
{
|
||||
headers: map[string]string{contentTypeHeader: "application/x-ndjson", contentLengthHeader: "1"},
|
||||
shouldFlush: true,
|
||||
},
|
||||
{
|
||||
headers: map[string]string{contentTypeHeader: "application/json"},
|
||||
shouldFlush: true,
|
||||
},
|
||||
{
|
||||
headers: map[string]string{contentTypeHeader: "application/json", contentLengthHeader: "-1", transferEncodingHeader: "chunked"},
|
||||
shouldFlush: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
headers := http.Header{}
|
||||
for k, v := range test.headers {
|
||||
headers.Add(k, v)
|
||||
}
|
||||
|
||||
require.Equal(t, test.shouldFlush, shouldFlush(headers))
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user