mirror of
https://github.com/cloudflare/cloudflared.git
synced 2025-05-11 06:06:35 +00:00

In the streambased origin proxy flow (example ssh over access), there is a chance when we do not flush on http.ResponseWriter writes. This PR guarantees that the response writer passed to proxy stream has a flusher embedded after writes. This means we write much more often back to the ResponseWriter and are not waiting. Note, this is only something we do when proxyHTTP-ing to a StreamBasedOriginProxy because we do not want to have situations where we are not sending information that is needed by the other side (eyeball).
204 lines
4.8 KiB
Go
204 lines
4.8 KiB
Go
package connection
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"math/rand"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/rs/zerolog"
|
|
|
|
"github.com/cloudflare/cloudflared/stream"
|
|
"github.com/cloudflare/cloudflared/tracing"
|
|
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
|
|
"github.com/cloudflare/cloudflared/websocket"
|
|
)
|
|
|
|
const (
|
|
largeFileSize = 2 * 1024 * 1024
|
|
testGracePeriod = time.Millisecond * 100
|
|
)
|
|
|
|
var (
|
|
testOrchestrator = &mockOrchestrator{
|
|
originProxy: &mockOriginProxy{},
|
|
}
|
|
log = zerolog.Nop()
|
|
testLargeResp = make([]byte, largeFileSize)
|
|
)
|
|
|
|
var _ ReadWriteAcker = (*HTTPResponseReadWriteAcker)(nil)
|
|
|
|
type testRequest struct {
|
|
name string
|
|
endpoint string
|
|
expectedStatus int
|
|
expectedBody []byte
|
|
isProxyError bool
|
|
}
|
|
|
|
type mockOrchestrator struct {
|
|
originProxy OriginProxy
|
|
}
|
|
|
|
func (mcr *mockOrchestrator) GetConfigJSON() ([]byte, error) {
|
|
return nil, fmt.Errorf("not implemented")
|
|
}
|
|
|
|
func (*mockOrchestrator) UpdateConfig(version int32, config []byte) *tunnelpogs.UpdateConfigurationResponse {
|
|
return &tunnelpogs.UpdateConfigurationResponse{
|
|
LastAppliedVersion: version,
|
|
}
|
|
}
|
|
|
|
func (mcr *mockOrchestrator) GetOriginProxy() (OriginProxy, error) {
|
|
return mcr.originProxy, nil
|
|
}
|
|
|
|
func (mcr *mockOrchestrator) WarpRoutingEnabled() (enabled bool) {
|
|
return true
|
|
}
|
|
|
|
type mockOriginProxy struct{}
|
|
|
|
func (moc *mockOriginProxy) ProxyHTTP(
|
|
w ResponseWriter,
|
|
tr *tracing.TracedHTTPRequest,
|
|
isWebsocket bool,
|
|
) error {
|
|
req := tr.Request
|
|
if isWebsocket {
|
|
switch req.URL.Path {
|
|
case "/ws/echo":
|
|
return wsEchoEndpoint(w, req)
|
|
case "/ws/flaky":
|
|
return wsFlakyEndpoint(w, req)
|
|
default:
|
|
originRespEndpoint(w, http.StatusNotFound, []byte("ws endpoint not found"))
|
|
return fmt.Errorf("Unknwon websocket endpoint %s", req.URL.Path)
|
|
}
|
|
}
|
|
switch req.URL.Path {
|
|
case "/ok":
|
|
originRespEndpoint(w, http.StatusOK, []byte(http.StatusText(http.StatusOK)))
|
|
case "/large_file":
|
|
originRespEndpoint(w, http.StatusOK, testLargeResp)
|
|
case "/400":
|
|
originRespEndpoint(w, http.StatusBadRequest, []byte(http.StatusText(http.StatusBadRequest)))
|
|
case "/500":
|
|
originRespEndpoint(w, http.StatusInternalServerError, []byte(http.StatusText(http.StatusInternalServerError)))
|
|
case "/error":
|
|
return fmt.Errorf("Failed to proxy to origin")
|
|
default:
|
|
originRespEndpoint(w, http.StatusNotFound, []byte("page not found"))
|
|
}
|
|
return nil
|
|
|
|
}
|
|
|
|
func (moc *mockOriginProxy) ProxyTCP(
|
|
ctx context.Context,
|
|
rwa ReadWriteAcker,
|
|
r *TCPRequest,
|
|
) error {
|
|
return nil
|
|
}
|
|
|
|
type echoPipe struct {
|
|
reader *io.PipeReader
|
|
writer *io.PipeWriter
|
|
}
|
|
|
|
func (ep *echoPipe) Read(p []byte) (int, error) {
|
|
return ep.reader.Read(p)
|
|
}
|
|
|
|
func (ep *echoPipe) Write(p []byte) (int, error) {
|
|
return ep.writer.Write(p)
|
|
}
|
|
|
|
// A mock origin that echos data by streaming like a tcpOverWSConnection
|
|
// https://github.com/cloudflare/cloudflared/blob/master/ingress/origin_connection.go
|
|
func wsEchoEndpoint(w ResponseWriter, r *http.Request) error {
|
|
resp := &http.Response{
|
|
StatusCode: http.StatusSwitchingProtocols,
|
|
}
|
|
if err := w.WriteRespHeaders(resp.StatusCode, resp.Header); err != nil {
|
|
return err
|
|
}
|
|
wsCtx, cancel := context.WithCancel(r.Context())
|
|
readPipe, writePipe := io.Pipe()
|
|
|
|
wsConn := websocket.NewConn(wsCtx, NewHTTPResponseReadWriterAcker(w, w.(http.Flusher), r), &log)
|
|
go func() {
|
|
select {
|
|
case <-wsCtx.Done():
|
|
case <-r.Context().Done():
|
|
}
|
|
readPipe.Close()
|
|
writePipe.Close()
|
|
}()
|
|
|
|
originConn := &echoPipe{reader: readPipe, writer: writePipe}
|
|
stream.Pipe(wsConn, originConn, &log)
|
|
cancel()
|
|
wsConn.Close()
|
|
return nil
|
|
}
|
|
|
|
type flakyConn struct {
|
|
closeAt time.Time
|
|
}
|
|
|
|
func (fc *flakyConn) Read(p []byte) (int, error) {
|
|
if time.Now().After(fc.closeAt) {
|
|
return 0, io.EOF
|
|
}
|
|
n := copy(p, "Read from flaky connection")
|
|
return n, nil
|
|
}
|
|
|
|
func (fc *flakyConn) Write(p []byte) (int, error) {
|
|
if time.Now().After(fc.closeAt) {
|
|
return 0, fmt.Errorf("flaky connection closed")
|
|
}
|
|
return len(p), nil
|
|
}
|
|
|
|
func wsFlakyEndpoint(w ResponseWriter, r *http.Request) error {
|
|
resp := &http.Response{
|
|
StatusCode: http.StatusSwitchingProtocols,
|
|
}
|
|
if err := w.WriteRespHeaders(resp.StatusCode, resp.Header); err != nil {
|
|
return err
|
|
}
|
|
wsCtx, cancel := context.WithCancel(r.Context())
|
|
|
|
wsConn := websocket.NewConn(wsCtx, NewHTTPResponseReadWriterAcker(w, w.(http.Flusher), r), &log)
|
|
|
|
closedAfter := time.Millisecond * time.Duration(rand.Intn(50))
|
|
originConn := &flakyConn{closeAt: time.Now().Add(closedAfter)}
|
|
stream.Pipe(wsConn, originConn, &log)
|
|
cancel()
|
|
wsConn.Close()
|
|
return nil
|
|
}
|
|
|
|
func originRespEndpoint(w ResponseWriter, status int, data []byte) {
|
|
resp := &http.Response{
|
|
StatusCode: status,
|
|
}
|
|
_ = w.WriteRespHeaders(resp.StatusCode, resp.Header)
|
|
_, _ = w.Write(data)
|
|
}
|
|
|
|
type mockConnectedFuse struct{}
|
|
|
|
func (mcf mockConnectedFuse) Connected() {}
|
|
|
|
func (mcf mockConnectedFuse) IsConnected() bool {
|
|
return true
|
|
}
|