mirror of
https://github.com/cloudflare/cloudflared.git
synced 2025-07-27 15:49:58 +00:00
TUN-3467: Serialize cf-cloudflared-response-meta during package initialization using jsoniter
This commit is contained in:
@@ -2,13 +2,14 @@ package connection
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/cloudflare/cloudflared/h2mux"
|
||||
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
|
||||
@@ -32,14 +33,16 @@ type HTTP2Connection struct {
|
||||
observer *Observer
|
||||
connIndexStr string
|
||||
connIndex uint8
|
||||
shutdownChan chan struct{}
|
||||
wg *sync.WaitGroup
|
||||
connectedFuse ConnectedFuse
|
||||
}
|
||||
|
||||
func NewHTTP2Connection(conn net.Conn, config *Config, originURL *url.URL, namedTunnelConfig *NamedTunnelConfig, connOptions *tunnelpogs.ConnectionOptions, observer *Observer, connIndex uint8, connectedFuse ConnectedFuse) (*HTTP2Connection, error) {
|
||||
return &HTTP2Connection{
|
||||
conn: conn,
|
||||
server: &http2.Server{},
|
||||
conn: conn,
|
||||
server: &http2.Server{
|
||||
MaxConcurrentStreams: math.MaxUint32,
|
||||
},
|
||||
config: config,
|
||||
originURL: originURL,
|
||||
namedTunnel: namedTunnelConfig,
|
||||
@@ -47,7 +50,7 @@ func NewHTTP2Connection(conn net.Conn, config *Config, originURL *url.URL, named
|
||||
observer: observer,
|
||||
connIndexStr: uint8ToString(connIndex),
|
||||
connIndex: connIndex,
|
||||
shutdownChan: make(chan struct{}),
|
||||
wg: &sync.WaitGroup{},
|
||||
connectedFuse: connectedFuse,
|
||||
}, nil
|
||||
}
|
||||
@@ -64,6 +67,9 @@ func (c *HTTP2Connection) Serve(ctx context.Context) {
|
||||
}
|
||||
|
||||
func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
c.wg.Add(1)
|
||||
defer c.wg.Done()
|
||||
|
||||
r.URL.Scheme = c.originURL.Scheme
|
||||
r.URL.Host = c.originURL.Host
|
||||
|
||||
@@ -103,9 +109,8 @@ func (c *HTTP2Connection) serveControlStream(ctx context.Context, h2RespWriter *
|
||||
}
|
||||
c.connectedFuse.Connected()
|
||||
|
||||
<-c.shutdownChan
|
||||
<-ctx.Done()
|
||||
c.gracefulShutdown(ctx, rpcClient)
|
||||
close(c.shutdownChan)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -135,10 +140,8 @@ func (c *HTTP2Connection) gracefulShutdown(ctx context.Context, rpcClient *regis
|
||||
}
|
||||
|
||||
func (c *HTTP2Connection) close() {
|
||||
// Send signal to control loop to start graceful shutdown
|
||||
c.shutdownChan <- struct{}{}
|
||||
// Wait for control loop to close channel
|
||||
<-c.shutdownChan
|
||||
// Wait for all serve HTTP handlers to return
|
||||
c.wg.Wait()
|
||||
c.conn.Close()
|
||||
}
|
||||
|
||||
@@ -168,7 +171,8 @@ func (rp *http2RespWriter) WriteRespHeaders(resp *http.Response) error {
|
||||
}
|
||||
|
||||
// Perform user header serialization and set them in the single header
|
||||
dest.Set(h2mux.ResponseUserHeadersField, h2mux.SerializeHeaders(userHeaders))
|
||||
dest.Set(canonicalResponseUserHeadersField, h2mux.SerializeHeaders(userHeaders))
|
||||
rp.setResponseMetaHeader(responseMetaHeaderCfd)
|
||||
status := resp.StatusCode
|
||||
// HTTP2 removes support for 101 Switching Protocols https://tools.ietf.org/html/rfc7540#section-8.1.1
|
||||
if status == http.StatusSwitchingProtocols {
|
||||
@@ -179,13 +183,14 @@ func (rp *http2RespWriter) WriteRespHeaders(resp *http.Response) error {
|
||||
}
|
||||
|
||||
func (rp *http2RespWriter) WriteErrorResponse(err error) {
|
||||
jsonResponseMetaHeader, err := json.Marshal(h2mux.ResponseMetaHeader{Source: h2mux.ResponseSourceCloudflared})
|
||||
if err == nil {
|
||||
rp.w.Header().Set(h2mux.ResponseMetaHeaderField, string(jsonResponseMetaHeader))
|
||||
}
|
||||
rp.setResponseMetaHeader(responseMetaHeaderCfd)
|
||||
rp.w.WriteHeader(http.StatusBadGateway)
|
||||
}
|
||||
|
||||
func (rp *http2RespWriter) setResponseMetaHeader(value string) {
|
||||
rp.w.Header().Set(canonicalResponseMetaHeaderField, value)
|
||||
}
|
||||
|
||||
func (rp *http2RespWriter) Read(p []byte) (n int, err error) {
|
||||
return rp.r.Read(p)
|
||||
}
|
||||
@@ -195,7 +200,7 @@ func (wr *http2RespWriter) Write(p []byte) (n int, err error) {
|
||||
}
|
||||
|
||||
type wsRespWriter struct {
|
||||
h2 *http2RespWriter
|
||||
*http2RespWriter
|
||||
flusher http.Flusher
|
||||
}
|
||||
|
||||
@@ -205,34 +210,24 @@ func newWSRespWriter(h2 *http2RespWriter) (*wsRespWriter, error) {
|
||||
return nil, fmt.Errorf("ResponseWriter doesn't implement http.Flusher")
|
||||
}
|
||||
return &wsRespWriter{
|
||||
h2: h2,
|
||||
flusher: flusher,
|
||||
h2,
|
||||
flusher,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (rw *wsRespWriter) WriteRespHeaders(resp *http.Response) error {
|
||||
err := rw.h2.WriteRespHeaders(resp)
|
||||
if err != nil {
|
||||
return err
|
||||
func (rw *wsRespWriter) WriteRespHeaders(resp *http.Response) (err error) {
|
||||
err = rw.http2RespWriter.WriteRespHeaders(resp)
|
||||
if err == nil {
|
||||
rw.flusher.Flush()
|
||||
}
|
||||
rw.flusher.Flush()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rw *wsRespWriter) WriteErrorResponse(err error) {
|
||||
rw.h2.WriteErrorResponse(err)
|
||||
}
|
||||
|
||||
func (rw *wsRespWriter) Read(p []byte) (n int, err error) {
|
||||
return rw.h2.Read(p)
|
||||
return
|
||||
}
|
||||
|
||||
func (rw *wsRespWriter) Write(p []byte) (n int, err error) {
|
||||
n, err = rw.h2.Write(p)
|
||||
if err != nil {
|
||||
return
|
||||
n, err = rw.http2RespWriter.Write(p)
|
||||
if err == nil {
|
||||
rw.flusher.Flush()
|
||||
}
|
||||
rw.flusher.Flush()
|
||||
return
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user