TUN-3615: added support to proxy tcp streams

added ingress.DefaultStreamHandler and a basic test for tcp stream proxy
moved websocket.Stream to ingress
cloudflared no longer picks tcpstream host from header
This commit is contained in:
Sudarsan Reddy
2021-01-11 19:59:45 +00:00
committed by Nuno Diegues
parent e2262085e5
commit 368066a966
14 changed files with 256 additions and 96 deletions

View File

@@ -38,7 +38,7 @@ func NewOriginProxy(ingressRules ingress.Ingress, tags []tunnelpogs.Tag, log *ze
}
}
func (p *proxy) Proxy(w connection.ResponseWriter, req *http.Request, isWebsocket bool) error {
func (p *proxy) Proxy(w connection.ResponseWriter, req *http.Request, sourceConnectionType connection.Type) error {
incrementRequests()
defer decrementConcurrentRequests()
@@ -49,43 +49,50 @@ func (p *proxy) Proxy(w connection.ResponseWriter, req *http.Request, isWebsocke
rule, ruleNum := p.ingressRules.FindMatchingRule(req.Host, req.URL.Path)
p.logRequest(req, cfRay, lbProbe, ruleNum)
var (
resp *http.Response
err error
)
if isWebsocket {
go websocket.NewConn(w, p.log).Pinger(req.Context())
connClosedChan := make(chan struct{})
err = p.proxyConnection(connClosedChan, w, req, rule)
if err == nil {
respHeader := websocket.NewResponseHeader(req)
status := http.StatusSwitchingProtocols
resp = &http.Response{
Status: http.StatusText(status),
StatusCode: status,
Header: respHeader,
ContentLength: -1,
}
w.WriteRespHeaders(http.StatusSwitchingProtocols, respHeader)
<-connClosedChan
if sourceConnectionType == connection.TypeHTTP {
resp, err := p.proxyHTTP(w, req, rule)
if err != nil {
p.logErrorAndWriteResponse(w, err, cfRay, ruleNum)
return err
}
} else {
resp, err = p.proxyHTTP(w, req, rule)
p.logOriginResponse(resp, cfRay, lbProbe, ruleNum)
return nil
}
respHeader := http.Header{}
if sourceConnectionType == connection.TypeWebsocket {
go websocket.NewConn(w, p.log).Pinger(req.Context())
respHeader = websocket.NewResponseHeader(req)
}
connClosedChan := make(chan struct{})
err := p.proxyConnection(connClosedChan, w, req, rule)
if err != nil {
p.logRequestError(err, cfRay, ruleNum)
w.WriteErrorResponse()
p.logErrorAndWriteResponse(w, err, cfRay, ruleNum)
return err
}
p.logOriginResponse(resp, cfRay, lbProbe, ruleNum)
status := http.StatusSwitchingProtocols
resp := &http.Response{
Status: http.StatusText(status),
StatusCode: status,
Header: respHeader,
ContentLength: -1,
}
w.WriteRespHeaders(http.StatusSwitchingProtocols, nil)
<-connClosedChan
p.logOriginResponse(resp, cfRay, lbProbe, ruleNum)
return nil
}
func (p *proxy) logErrorAndWriteResponse(w connection.ResponseWriter, err error, cfRay string, ruleNum int) {
p.logRequestError(err, cfRay, ruleNum)
w.WriteErrorResponse()
}
func (p *proxy) proxyHTTP(w connection.ResponseWriter, req *http.Request, rule *ingress.Rule) (*http.Response, error) {
// Support for WSGI Servers by switching transfer encoding from chunked to gzip/deflate
if rule.Config.DisableChunkedEncoding {

View File

@@ -143,7 +143,7 @@ func testProxyHTTP(t *testing.T, proxy connection.OriginProxy) func(t *testing.T
req, err := http.NewRequest(http.MethodGet, "http://localhost:8080", nil)
require.NoError(t, err)
err = proxy.Proxy(respWriter, req, false)
err = proxy.Proxy(respWriter, req, connection.TypeHTTP)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, respWriter.Code)
@@ -163,7 +163,7 @@ func testProxyWebsocket(t *testing.T, proxy connection.OriginProxy) func(t *test
wg.Add(1)
go func() {
defer wg.Done()
err = proxy.Proxy(respWriter, req, true)
err = proxy.Proxy(respWriter, req, connection.TypeWebsocket)
require.NoError(t, err)
require.Equal(t, http.StatusSwitchingProtocols, respWriter.Code)
@@ -205,7 +205,7 @@ func testProxySSE(t *testing.T, proxy connection.OriginProxy) func(t *testing.T)
wg.Add(1)
go func() {
defer wg.Done()
err = proxy.Proxy(respWriter, req, false)
err = proxy.Proxy(respWriter, req, connection.TypeHTTP)
require.NoError(t, err)
require.Equal(t, http.StatusOK, respWriter.Code)
@@ -298,7 +298,7 @@ func TestProxyMultipleOrigins(t *testing.T) {
req, err := http.NewRequest(http.MethodGet, test.url, nil)
require.NoError(t, err)
err = proxy.Proxy(respWriter, req, false)
err = proxy.Proxy(respWriter, req, connection.TypeHTTP)
require.NoError(t, err)
assert.Equal(t, test.expectedStatus, respWriter.Code)
@@ -346,7 +346,7 @@ func TestProxyError(t *testing.T) {
req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1", nil)
assert.NoError(t, err)
err = proxy.Proxy(respWriter, req, false)
err = proxy.Proxy(respWriter, req, connection.TypeHTTP)
assert.Error(t, err)
assert.Equal(t, http.StatusBadGateway, respWriter.Code)
assert.Equal(t, "http response error", respWriter.Body.String())
@@ -376,12 +376,10 @@ func TestProxyBastionMode(t *testing.T) {
t.Run("testBastionWebsocket", testBastionWebsocket(proxy))
cancel()
}
func testBastionWebsocket(proxy connection.OriginProxy) func(t *testing.T) {
return func(t *testing.T) {
// WSRoute is a websocket echo handler
ctx, cancel := context.WithCancel(context.Background())
readPipe, _ := io.Pipe()
respWriter := newMockWSRespWriter(readPipe)
@@ -389,14 +387,15 @@ func testBastionWebsocket(proxy connection.OriginProxy) func(t *testing.T) {
var wg sync.WaitGroup
msgFromConn := []byte("data from websocket proxy")
ln, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
wg.Add(1)
go func() {
defer wg.Done()
defer ln.Close()
server, err := ln.Accept()
conn, err := ln.Accept()
require.NoError(t, err)
conn := websocket.NewConn(server, nil)
conn.Write(msgFromConn)
wsConn := websocket.NewConn(conn, nil)
wsConn.Write(msgFromConn)
}()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://dummy", nil)
@@ -405,7 +404,7 @@ func testBastionWebsocket(proxy connection.OriginProxy) func(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
err = proxy.Proxy(respWriter, req, true)
err = proxy.Proxy(respWriter, req, connection.TypeWebsocket)
require.NoError(t, err)
require.Equal(t, http.StatusSwitchingProtocols, respWriter.Code)
@@ -422,3 +421,92 @@ func testBastionWebsocket(proxy connection.OriginProxy) func(t *testing.T) {
wg.Wait()
}
}
func TestTCPStream(t *testing.T) {
logger := logger.Create(nil)
ctx, cancel := context.WithCancel(context.Background())
ingressConfig := &config.Configuration{
Ingress: []config.UnvalidatedIngressRule{
config.UnvalidatedIngressRule{
Hostname: "*",
Service: ingress.ServiceTeamnet,
},
},
}
ingressRule, err := ingress.ParseIngress(ingressConfig)
require.NoError(t, err)
var wg sync.WaitGroup
errC := make(chan error)
ingressRule.StartOrigins(&wg, logger, ctx.Done(), errC)
proxy := NewOriginProxy(ingressRule, testTags, logger)
t.Run("testTCPStream", testTCPStreamProxy(proxy))
cancel()
wg.Wait()
}
type mockTCPRespWriter struct {
w io.Writer
code int
}
func (m *mockTCPRespWriter) Read(p []byte) (n int, err error) {
return len(p), nil
}
func (m *mockTCPRespWriter) Write(p []byte) (n int, err error) {
return m.w.Write(p)
}
func (m *mockTCPRespWriter) WriteErrorResponse() {
}
func (m *mockTCPRespWriter) WriteRespHeaders(status int, header http.Header) error {
m.code = status
return nil
}
func testTCPStreamProxy(proxy connection.OriginProxy) func(t *testing.T) {
return func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
readPipe, writePipe := io.Pipe()
respWriter := &mockTCPRespWriter{
w: writePipe,
}
msgFromConn := []byte("data from tcp proxy")
ln, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
go func() {
defer ln.Close()
conn, err := ln.Accept()
require.NoError(t, err)
defer conn.Close()
_, err = conn.Write(msgFromConn)
require.NoError(t, err)
}()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://dummy", nil)
require.NoError(t, err)
req.Header.Set("Cf-Cloudflared-Proxy-Src", "non-blank-value")
req.Host = ln.Addr().String()
err = proxy.Proxy(respWriter, req, connection.TypeTCP)
require.NoError(t, err)
require.Equal(t, http.StatusSwitchingProtocols, respWriter.code)
returnedMsg := make([]byte, len(msgFromConn))
_, err = readPipe.Read(returnedMsg)
require.NoError(t, err)
require.Equal(t, msgFromConn, returnedMsg)
cancel()
}
}