TUN-3470: Replace in-house logger calls with zerolog

This commit is contained in:
Areg Harutyunyan
2020-11-25 00:55:13 -06:00
committed by Adam Chalmers
parent 06404bf3e8
commit 870f5fa907
151 changed files with 7120 additions and 3365 deletions

View File

@@ -8,11 +8,10 @@ import (
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"golang.org/x/sync/errgroup"
"github.com/cloudflare/cloudflared/logger"
)
const (
@@ -50,7 +49,7 @@ type MuxerConfig struct {
// The minimum number of heartbeats to send before terminating the connection.
MaxHeartbeats uint64
// Logger to use
Logger logger.Service
Log *zerolog.Logger
CompressionQuality CompressionSetting
// Initial size for HTTP2 flow control windows
DefaultWindowSize uint32
@@ -138,10 +137,10 @@ func Handshake(
handshakeSetting := http2.Setting{ID: SettingMuxerMagic, Val: MuxerMagicEdge}
compressionSetting := http2.Setting{ID: SettingCompression, Val: config.CompressionQuality.toH2Setting()}
if CompressionIsSupported() {
config.Logger.Debug("muxer: Compression is supported")
config.Log.Debug().Msg("muxer: Compression is supported")
m.compressionQuality = config.CompressionQuality.getPreset()
} else {
config.Logger.Debug("muxer: Compression is not supported")
config.Log.Debug().Msg("muxer: Compression is not supported")
compressionSetting = http2.Setting{ID: SettingCompression, Val: 0}
}
@@ -178,12 +177,12 @@ func Handshake(
// Sanity check to enusre idelDuration is sane
if idleDuration == 0 || idleDuration < defaultTimeout {
idleDuration = defaultTimeout
config.Logger.Infof("muxer: Minimum idle time has been adjusted to %d", defaultTimeout)
config.Log.Info().Msgf("muxer: Minimum idle time has been adjusted to %d", defaultTimeout)
}
maxRetries := config.MaxHeartbeats
if maxRetries == 0 {
maxRetries = defaultRetries
config.Logger.Infof("muxer: Minimum number of unacked heartbeats to send before closing the connection has been adjusted to %d", maxRetries)
config.Log.Info().Msgf("muxer: Minimum number of unacked heartbeats to send before closing the connection has been adjusted to %d", maxRetries)
}
compBytesBefore, compBytesAfter := NewAtomicCounter(0), NewAtomicCounter(0)
@@ -325,7 +324,7 @@ func (m *Muxer) Serve(ctx context.Context) error {
errGroup.Go(func() error {
ch := make(chan error)
go func() {
err := m.muxReader.run(m.config.Logger)
err := m.muxReader.run(m.config.Log)
m.explicitShutdown.Fuse(false)
m.r.Close()
m.abort()
@@ -346,7 +345,7 @@ func (m *Muxer) Serve(ctx context.Context) error {
errGroup.Go(func() error {
ch := make(chan error)
go func() {
err := m.muxWriter.run(m.config.Logger)
err := m.muxWriter.run(m.config.Log)
m.explicitShutdown.Fuse(false)
m.w.Close()
m.abort()
@@ -367,7 +366,7 @@ func (m *Muxer) Serve(ctx context.Context) error {
errGroup.Go(func() error {
ch := make(chan error)
go func() {
err := m.muxMetricsUpdater.run(m.config.Logger)
err := m.muxMetricsUpdater.run(m.config.Log)
// don't block if parent goroutine quit early
select {
case ch <- err:

View File

@@ -16,10 +16,9 @@ import (
"time"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"
"github.com/cloudflare/cloudflared/logger"
)
const (
@@ -27,6 +26,8 @@ const (
testHandshakeTimeout = time.Millisecond * 1000
)
var log = zerolog.Nop()
func TestMain(m *testing.M) {
if os.Getenv("VERBOSE") == "1" {
//TODO: set log level
@@ -52,7 +53,7 @@ func NewDefaultMuxerPair(t assert.TestingT, testName string, f MuxedStreamFunc)
Handler: f,
IsClient: true,
Name: "origin",
Logger: logger.NewOutputWriter(logger.NewMockWriteManager()),
Log: &log,
DefaultWindowSize: (1 << 8) - 1,
MaxWindowSize: (1 << 15) - 1,
StreamWriteBufferMaxLen: 1024,
@@ -64,7 +65,7 @@ func NewDefaultMuxerPair(t assert.TestingT, testName string, f MuxedStreamFunc)
Timeout: testHandshakeTimeout,
IsClient: false,
Name: "edge",
Logger: logger.NewOutputWriter(logger.NewMockWriteManager()),
Log: &log,
DefaultWindowSize: (1 << 8) - 1,
MaxWindowSize: (1 << 15) - 1,
StreamWriteBufferMaxLen: 1024,
@@ -87,7 +88,7 @@ func NewCompressedMuxerPair(t assert.TestingT, testName string, quality Compress
IsClient: true,
Name: "origin",
CompressionQuality: quality,
Logger: logger.NewOutputWriter(logger.NewMockWriteManager()),
Log: &log,
HeartbeatInterval: defaultTimeout,
MaxHeartbeats: defaultRetries,
},
@@ -97,7 +98,7 @@ func NewCompressedMuxerPair(t assert.TestingT, testName string, quality Compress
IsClient: false,
Name: "edge",
CompressionQuality: quality,
Logger: logger.NewOutputWriter(logger.NewMockWriteManager()),
Log: &log,
HeartbeatInterval: defaultTimeout,
MaxHeartbeats: defaultRetries,
},
@@ -186,11 +187,11 @@ func TestSingleStream(t *testing.T) {
if stream.Headers[0].Value != "headerValue" {
t.Fatalf("expected header value %s, got %s", "headerValue", stream.Headers[0].Value)
}
stream.WriteHeaders([]Header{
_ = stream.WriteHeaders([]Header{
{Name: "response-header", Value: "responseValue"},
})
buf := []byte("Hello world")
stream.Write(buf)
_, _ = stream.Write(buf)
n, err := io.ReadFull(stream, buf)
if n > 0 {
t.Fatalf("read %d bytes after EOF", n)
@@ -230,7 +231,7 @@ func TestSingleStream(t *testing.T) {
if string(responseBody) != "Hello world" {
t.Fatalf("expected response body %s, got %s", "Hello world", responseBody)
}
stream.Close()
_ = stream.Close()
n, err = stream.Write([]byte("aaaaa"))
if n > 0 {
t.Fatalf("wrote %d bytes after EOF", n)
@@ -252,7 +253,7 @@ func TestSingleStreamLargeResponseBody(t *testing.T) {
if stream.Headers[0].Value != "headerValue" {
t.Fatalf("expected header value %s, got %s", "headerValue", stream.Headers[0].Value)
}
stream.WriteHeaders([]Header{
_ = stream.WriteHeaders([]Header{
{Name: "response-header", Value: "responseValue"},
})
payload := make([]byte, bodySize)
@@ -302,7 +303,6 @@ func TestSingleStreamLargeResponseBody(t *testing.T) {
}
func TestMultipleStreams(t *testing.T) {
l := logger.NewOutputWriter(logger.NewMockWriteManager())
f := MuxedStreamFunc(func(stream *MuxedStream) error {
if len(stream.Headers) != 1 {
t.Fatalf("expected %d headers, got %d", 1, len(stream.Headers))
@@ -310,13 +310,13 @@ func TestMultipleStreams(t *testing.T) {
if stream.Headers[0].Name != "client-token" {
t.Fatalf("expected header name %s, got %s", "client-token", stream.Headers[0].Name)
}
l.Debugf("Got request for stream %s", stream.Headers[0].Value)
stream.WriteHeaders([]Header{
log.Debug().Msgf("Got request for stream %s", stream.Headers[0].Value)
_ = stream.WriteHeaders([]Header{
{Name: "response-token", Value: stream.Headers[0].Value},
})
l.Debugf("Wrote headers for stream %s", stream.Headers[0].Value)
stream.Write([]byte("OK"))
l.Debugf("Wrote body for stream %s", stream.Headers[0].Value)
log.Debug().Msgf("Wrote headers for stream %s", stream.Headers[0].Value)
_, _ = stream.Write([]byte("OK"))
log.Debug().Msgf("Wrote body for stream %s", stream.Headers[0].Value)
return nil
})
muxPair := NewDefaultMuxerPair(t, t.Name(), f)
@@ -334,7 +334,7 @@ func TestMultipleStreams(t *testing.T) {
[]Header{{Name: "client-token", Value: tokenString}},
nil,
)
l.Debugf("Got headers for stream %d", tokenId)
log.Debug().Msgf("Got headers for stream %d", tokenId)
if err != nil {
errorsC <- err
return
@@ -372,7 +372,7 @@ func TestMultipleStreams(t *testing.T) {
testFail := false
for err := range errorsC {
testFail = true
l.Errorf("%s", err)
log.Error().Msgf("%s", err)
}
if testFail {
t.Fatalf("TestMultipleStreams failed")
@@ -396,7 +396,7 @@ func TestMultipleStreamsFlowControl(t *testing.T) {
if stream.Headers[0].Value != "headerValue" {
t.Fatalf("expected header value %s, got %s", "headerValue", stream.Headers[0].Value)
}
stream.WriteHeaders([]Header{
_ = stream.WriteHeaders([]Header{
{Name: "response-header", Value: "responseValue"},
})
payload := make([]byte, responseSizes[(stream.streamID-2)/2])
@@ -450,27 +450,25 @@ func TestMultipleStreamsFlowControl(t *testing.T) {
}
func TestGracefulShutdown(t *testing.T) {
l := logger.NewOutputWriter(logger.NewMockWriteManager())
sendC := make(chan struct{})
responseBuf := bytes.Repeat([]byte("Hello world"), 65536)
f := MuxedStreamFunc(func(stream *MuxedStream) error {
stream.WriteHeaders([]Header{
_ = stream.WriteHeaders([]Header{
{Name: "response-header", Value: "responseValue"},
})
<-sendC
l.Debugf("Writing %d bytes", len(responseBuf))
stream.Write(responseBuf)
stream.CloseWrite()
l.Debugf("Wrote %d bytes", len(responseBuf))
log.Debug().Msgf("Writing %d bytes", len(responseBuf))
_, _ = stream.Write(responseBuf)
_ = stream.CloseWrite()
log.Debug().Msgf("Wrote %d bytes", len(responseBuf))
// Reading from the stream will block until the edge closes its end of the stream.
// Otherwise, we'll close the whole connection before receiving the 'stream closed'
// message from the edge.
// Graceful shutdown works if you omit this, it just gives spurious errors for now -
// TODO ignore errors when writing 'stream closed' and we're shutting down.
stream.Read([]byte{0})
l.Debugf("Handler ends")
_, _ = stream.Read([]byte{0})
log.Debug().Msgf("Handler ends")
return nil
})
muxPair := NewDefaultMuxerPair(t, t.Name(), f)
@@ -487,7 +485,7 @@ func TestGracefulShutdown(t *testing.T) {
muxPair.EdgeMux.Shutdown()
close(sendC)
responseBody := make([]byte, len(responseBuf))
l.Debugf("Waiting for %d bytes", len(responseBuf))
log.Debug().Msgf("Waiting for %d bytes", len(responseBuf))
n, err := io.ReadFull(stream, responseBody)
if err != nil {
t.Fatalf("error from (*MuxedStream).Read with %d bytes read: %s", n, err)
@@ -498,7 +496,7 @@ func TestGracefulShutdown(t *testing.T) {
if !bytes.Equal(responseBuf, responseBody) {
t.Fatalf("response body mismatch")
}
stream.Close()
_ = stream.Close()
muxPair.Wait(t)
}
@@ -509,7 +507,7 @@ func TestUnexpectedShutdown(t *testing.T) {
f := MuxedStreamFunc(func(stream *MuxedStream) error {
defer close(handlerFinishC)
stream.WriteHeaders([]Header{
_ = stream.WriteHeaders([]Header{
{Name: "response-header", Value: "responseValue"},
})
<-sendC
@@ -536,7 +534,7 @@ func TestUnexpectedShutdown(t *testing.T) {
nil,
)
// Close the underlying connection before telling the origin to write.
muxPair.EdgeConn.Close()
_ = muxPair.EdgeConn.Close()
close(sendC)
if err != nil {
t.Fatalf("error in OpenStream: %s", err)
@@ -559,18 +557,18 @@ func TestUnexpectedShutdown(t *testing.T) {
func EchoHandler(stream *MuxedStream) error {
var buf bytes.Buffer
fmt.Fprintf(&buf, "Hello, world!\n\n# REQUEST HEADERS:\n\n")
_, _ = fmt.Fprintf(&buf, "Hello, world!\n\n# REQUEST HEADERS:\n\n")
for _, header := range stream.Headers {
fmt.Fprintf(&buf, "[%s] = %s\n", header.Name, header.Value)
_, _ = fmt.Fprintf(&buf, "[%s] = %s\n", header.Name, header.Value)
}
stream.WriteHeaders([]Header{
_ = stream.WriteHeaders([]Header{
{Name: ":status", Value: "200"},
{Name: "server", Value: "Echo-server/1.0"},
{Name: "date", Value: time.Now().Format(time.RFC850)},
{Name: "content-type", Value: "text/html; charset=utf-8"},
{Name: "content-length", Value: strconv.Itoa(buf.Len())},
})
buf.WriteTo(stream)
_, _ = buf.WriteTo(stream)
return nil
}
@@ -582,14 +580,14 @@ func TestOpenAfterDisconnect(t *testing.T) {
switch i {
case 0:
// Close both directions of the connection to cause EOF on both peers.
muxPair.OriginConn.Close()
muxPair.EdgeConn.Close()
_ = muxPair.OriginConn.Close()
_ = muxPair.EdgeConn.Close()
case 1:
// Close origin conn to cause EOF on origin first.
muxPair.OriginConn.Close()
_ = muxPair.OriginConn.Close()
case 2:
// Close edge conn to cause EOF on edge first.
muxPair.EdgeConn.Close()
_ = muxPair.EdgeConn.Close()
}
_, err := muxPair.OpenEdgeMuxStream(
@@ -617,7 +615,7 @@ func TestHPACK(t *testing.T) {
if err != nil {
t.Fatalf("error in OpenStream: %s", err)
}
stream.Close()
_ = stream.Close()
for i := 0; i < 3; i++ {
stream, err := muxPair.OpenEdgeMuxStream(
@@ -654,8 +652,8 @@ func TestHPACK(t *testing.T) {
if stream.Headers[0].Value != "200" {
t.Fatalf("expected status 200, got %s", stream.Headers[0].Value)
}
ioutil.ReadAll(stream)
stream.Close()
_, _ = ioutil.ReadAll(stream)
_ = stream.Close()
}
}
@@ -680,7 +678,7 @@ func AssertIfPipeReadable(t *testing.T, pipe io.ReadCloser) {
}
func TestMultipleStreamsWithDictionaries(t *testing.T) {
l := logger.NewOutputWriter(logger.NewMockWriteManager())
l := zerolog.Nop()
for q := CompressionNone; q <= CompressionMax; q++ {
htmlBody := `<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"` +
@@ -730,10 +728,10 @@ func TestMultipleStreamsWithDictionaries(t *testing.T) {
contentType = "img/gif"
}
stream.WriteHeaders([]Header{
Header{Name: "content-type", Value: contentType},
_ = stream.WriteHeaders([]Header{
{Name: "content-type", Value: contentType},
})
stream.Write([]byte(strings.Replace(htmlBody, "paragraph", pathHeader.Value, 1) + stream.Headers[5].Value))
_, _ = stream.Write([]byte(strings.Replace(htmlBody, "paragraph", pathHeader.Value, 1) + stream.Headers[5].Value))
return nil
})
@@ -817,7 +815,7 @@ func TestMultipleStreamsWithDictionaries(t *testing.T) {
testFail := false
for err := range errorsC {
testFail = true
l.Errorf("%s", err)
l.Error().Msgf("%s", err)
}
if testFail {
t.Fatalf("TestMultipleStreams failed")
@@ -831,8 +829,6 @@ func TestMultipleStreamsWithDictionaries(t *testing.T) {
}
func sampleSiteHandler(files map[string][]byte) MuxedStreamFunc {
l := logger.NewOutputWriter(logger.NewMockWriteManager())
return func(stream *MuxedStream) error {
var contentType string
var pathHeader Header
@@ -857,16 +853,16 @@ func sampleSiteHandler(files map[string][]byte) MuxedStreamFunc {
} else {
contentType = "img/gif"
}
stream.WriteHeaders([]Header{
Header{Name: "content-type", Value: contentType},
_ = stream.WriteHeaders([]Header{
{Name: "content-type", Value: contentType},
})
l.Debugf("Wrote headers for stream %s", pathHeader.Value)
log.Debug().Msgf("Wrote headers for stream %s", pathHeader.Value)
file, ok := files[pathHeader.Value]
if !ok {
return fmt.Errorf("%s content is not preloaded", pathHeader.Value)
}
stream.Write(file)
l.Debugf("Wrote body for stream %s", pathHeader.Value)
_, _ = stream.Write(file)
log.Debug().Msgf("Wrote body for stream %s", pathHeader.Value)
return nil
}
}
@@ -1008,7 +1004,7 @@ func BenchmarkOpenStream(b *testing.B) {
if stream.Headers[0].Value != "headerValue" {
b.Fatalf("expected header value %s, got %s", "headerValue", stream.Headers[0].Value)
}
stream.WriteHeaders([]Header{
_ = stream.WriteHeaders([]Header{
{Name: "response-header", Value: "responseValue"},
})
return nil
@@ -1058,7 +1054,7 @@ func BenchmarkSingleStreamLargeResponseBody(b *testing.B) {
if stream.Headers[0].Value != "headerValue" {
b.Fatalf("expected header value %s, got %s", "headerValue", stream.Headers[0].Value)
}
stream.WriteHeaders([]Header{
_ = stream.WriteHeaders([]Header{
{Name: "response-header", Value: "responseValue"},
})
for i := 0; i < writeN; i++ {
@@ -1083,7 +1079,7 @@ func BenchmarkSingleStreamLargeResponseBody(b *testing.B) {
Handler: f,
IsClient: true,
Name: "origin",
Logger: logger.NewOutputWriter(logger.NewMockWriteManager()),
Log: &log,
DefaultWindowSize: defaultWindowSize,
MaxWindowSize: maxWindowSize,
StreamWriteBufferMaxLen: defaultWriteBufferMaxLen,
@@ -1095,7 +1091,7 @@ func BenchmarkSingleStreamLargeResponseBody(b *testing.B) {
Timeout: testHandshakeTimeout,
IsClient: false,
Name: "edge",
Logger: logger.NewOutputWriter(logger.NewMockWriteManager()),
Log: &log,
DefaultWindowSize: defaultWindowSize,
MaxWindowSize: maxWindowSize,
StreamWriteBufferMaxLen: defaultWriteBufferMaxLen,

View File

@@ -4,8 +4,8 @@ import (
"sync"
"time"
"github.com/cloudflare/cloudflared/logger"
"github.com/golang-collections/collections/queue"
"github.com/rs/zerolog"
)
// data points used to compute average receive window and send window size
@@ -20,7 +20,7 @@ type muxMetricsUpdater interface {
// metrics returns the latest metrics
metrics() *MuxerMetrics
// run is a blocking call to start the event loop
run(logger logger.Service) error
run(log *zerolog.Logger) error
// updateRTTChan is called by muxReader to report new RTT measurements
updateRTT(rtt *roundTripMeasurement)
//updateReceiveWindowChan is called by muxReader and muxWriter when receiveWindow size is updated
@@ -137,30 +137,30 @@ func (updater *muxMetricsUpdaterImpl) metrics() *MuxerMetrics {
return m
}
func (updater *muxMetricsUpdaterImpl) run(logger logger.Service) error {
defer logger.Debug("mux - metrics: event loop finished")
func (updater *muxMetricsUpdaterImpl) run(log *zerolog.Logger) error {
defer log.Debug().Msg("mux - metrics: event loop finished")
for {
select {
case <-updater.abortChan:
logger.Infof("mux - metrics: Stopping mux metrics updater")
log.Info().Msgf("mux - metrics: Stopping mux metrics updater")
return nil
case roundTripMeasurement := <-updater.updateRTTChan:
go updater.rttData.update(roundTripMeasurement)
logger.Debug("mux - metrics: Update rtt")
log.Debug().Msg("mux - metrics: Update rtt")
case receiveWindow := <-updater.updateReceiveWindowChan:
go updater.receiveWindowData.update(receiveWindow)
logger.Debug("mux - metrics: Update receive window")
log.Debug().Msg("mux - metrics: Update receive window")
case sendWindow := <-updater.updateSendWindowChan:
go updater.sendWindowData.update(sendWindow)
logger.Debug("mux - metrics: Update send window")
log.Debug().Msg("mux - metrics: Update send window")
case inBoundBytes := <-updater.updateInBoundBytesChan:
// inBoundBytes is bytes/sec because the update interval is 1 sec
go updater.inBoundRate.update(inBoundBytes)
logger.Debugf("mux - metrics: Inbound bytes %d", inBoundBytes)
log.Debug().Msgf("mux - metrics: Inbound bytes %d", inBoundBytes)
case outBoundBytes := <-updater.updateOutBoundBytesChan:
// outBoundBytes is bytes/sec because the update interval is 1 sec
go updater.outBoundRate.update(outBoundBytes)
logger.Debugf("mux - metrics: Outbound bytes %d", outBoundBytes)
log.Debug().Msgf("mux - metrics: Outbound bytes %d", outBoundBytes)
}
}
}

View File

@@ -5,7 +5,7 @@ import (
"testing"
"time"
"github.com/cloudflare/cloudflared/logger"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
)
@@ -91,10 +91,10 @@ func TestMuxMetricsUpdater(t *testing.T) {
abortChan := make(chan struct{})
compBefore, compAfter := NewAtomicCounter(0), NewAtomicCounter(0)
m := newMuxMetricsUpdater(abortChan, compBefore, compAfter)
logger := logger.NewOutputWriter(logger.NewMockWriteManager())
log := zerolog.Nop()
go func() {
errChan <- m.run(logger)
errChan <- m.run(&log)
}()
var wg sync.WaitGroup

View File

@@ -8,7 +8,7 @@ import (
"net/url"
"time"
"github.com/cloudflare/cloudflared/logger"
"github.com/rs/zerolog"
"golang.org/x/net/http2"
)
@@ -68,8 +68,8 @@ func (r *MuxReader) Shutdown() <-chan struct{} {
return done
}
func (r *MuxReader) run(logger logger.Service) error {
defer logger.Debug("mux - read: event loop finished")
func (r *MuxReader) run(log *zerolog.Logger) error {
defer log.Debug().Msg("mux - read: event loop finished")
// routine to periodically update bytesRead
go func() {
@@ -93,7 +93,7 @@ func (r *MuxReader) run(logger logger.Service) error {
}
switch e := err.(type) {
case http2.StreamError:
logger.Infof("%s: stream error", errorString)
log.Info().Msgf("%s: stream error", errorString)
// Ideally we wouldn't return here, since that aborts the muxer.
// We should communicate the error to the relevant MuxedStream
// data structure, so that callers of MuxedStream.Read() and
@@ -101,28 +101,28 @@ func (r *MuxReader) run(logger logger.Service) error {
// and keep the muxer going.
return r.streamError(e.StreamID, e.Code)
case http2.ConnectionError:
logger.Infof("%s: stream error", errorString)
log.Info().Msgf("%s: stream error", errorString)
return r.connectionError(err)
default:
if isConnectionClosedError(err) {
if r.streams.Len() == 0 {
// don't log the error here -- that would just be extra noise
logger.Debug("mux - read: shutting down")
log.Debug().Msg("mux - read: shutting down")
return nil
}
logger.Infof("%s: connection closed unexpectedly", errorString)
log.Info().Msgf("%s: connection closed unexpectedly", errorString)
return err
} else {
logger.Infof("%s: frame read error", errorString)
log.Info().Msgf("%s: frame read error", errorString)
return r.connectionError(err)
}
}
}
r.connActive.Signal()
logger.Debugf("mux - read: read frame: data %v", frame)
log.Debug().Msgf("mux - read: read frame: data %v", frame)
switch f := frame.(type) {
case *http2.DataFrame:
err = r.receiveFrameData(f, logger)
err = r.receiveFrameData(f, log)
case *http2.MetaHeadersFrame:
err = r.receiveHeaderData(f)
case *http2.RSTStreamFrame:
@@ -155,7 +155,7 @@ func (r *MuxReader) run(logger logger.Service) error {
err = ErrUnexpectedFrameType
}
if err != nil {
logger.Debugf("mux - read: read error: data %v", frame)
log.Debug().Msgf("mux - read: read error: data %v", frame)
return r.connectionError(err)
}
}
@@ -276,7 +276,7 @@ func (r *MuxReader) handleStream(stream *MuxedStream) {
}
// Receives a data frame from a stream. A non-nil error is a connection error.
func (r *MuxReader) receiveFrameData(frame *http2.DataFrame, logger logger.Service) error {
func (r *MuxReader) receiveFrameData(frame *http2.DataFrame, log *zerolog.Logger) error {
stream, err := r.getStreamForFrame(frame)
if err != nil {
return r.defaultStreamErrorHandler(err, frame.Header())
@@ -292,9 +292,9 @@ func (r *MuxReader) receiveFrameData(frame *http2.DataFrame, logger logger.Servi
if frame.Header().Flags.Has(http2.FlagDataEndStream) {
if stream.receiveEOF() {
r.streams.Delete(stream.streamID)
logger.Debugf("mux - read: stream closed: streamID: %d", frame.Header().StreamID)
log.Debug().Msgf("mux - read: stream closed: streamID: %d", frame.Header().StreamID)
} else {
logger.Debugf("mux - read: shutdown receive side: streamID: %d", frame.Header().StreamID)
log.Debug().Msgf("mux - read: shutdown receive side: streamID: %d", frame.Header().StreamID)
}
return nil
}

View File

@@ -3,10 +3,10 @@ package h2mux
import (
"bytes"
"encoding/binary"
"github.com/rs/zerolog"
"io"
"time"
"github.com/cloudflare/cloudflared/logger"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
)
@@ -72,8 +72,8 @@ func tsToPingData(ts int64) [8]byte {
return pingData
}
func (w *MuxWriter) run(logger logger.Service) error {
defer logger.Debug("mux - write: event loop finished")
func (w *MuxWriter) run(log *zerolog.Logger) error {
defer log.Debug().Msg("mux - write: event loop finished")
// routine to periodically communicate bytesWrote
go func() {
@@ -91,17 +91,17 @@ func (w *MuxWriter) run(logger logger.Service) error {
for {
select {
case <-w.abortChan:
logger.Debug("mux - write: aborting writer thread")
log.Debug().Msg("mux - write: aborting writer thread")
return nil
case errCode := <-w.goAwayChan:
logger.Debugf("mux - write: sending GOAWAY code %v", errCode)
log.Debug().Msgf("mux - write: sending GOAWAY code %v", errCode)
err := w.f.WriteGoAway(w.streams.LastPeerStreamID(), errCode, []byte{})
if err != nil {
return err
}
w.idleTimer.MarkActive()
case <-w.pingTimestamp.GetUpdateChan():
logger.Debug("mux - write: sending PING ACK")
log.Debug().Msg("mux - write: sending PING ACK")
err := w.f.WritePing(true, tsToPingData(w.pingTimestamp.Get()))
if err != nil {
return err
@@ -111,7 +111,7 @@ func (w *MuxWriter) run(logger logger.Service) error {
if !w.idleTimer.Retry() {
return ErrConnectionDropped
}
logger.Debug("mux - write: sending PING")
log.Debug().Msg("mux - write: sending PING")
err := w.f.WritePing(false, tsToPingData(time.Now().UnixNano()))
if err != nil {
return err
@@ -121,7 +121,7 @@ func (w *MuxWriter) run(logger logger.Service) error {
w.idleTimer.MarkActive()
case <-w.streamErrors.GetSignalChan():
for streamID, errCode := range w.streamErrors.GetErrors() {
logger.Debugf("mux - write: resetting stream with code: %v streamID: %d", errCode, streamID)
log.Debug().Msgf("mux - write: resetting stream with code: %v streamID: %d", errCode, streamID)
err := w.f.WriteRSTStream(streamID, errCode)
if err != nil {
return err
@@ -141,7 +141,7 @@ func (w *MuxWriter) run(logger logger.Service) error {
if streamRequest.body != nil {
go streamRequest.flushBody()
}
err := w.writeStreamData(streamRequest.stream, logger)
err := w.writeStreamData(streamRequest.stream, log)
if err != nil {
return err
}
@@ -151,7 +151,7 @@ func (w *MuxWriter) run(logger logger.Service) error {
if !ok {
continue
}
err := w.writeStreamData(stream, logger)
err := w.writeStreamData(stream, log)
if err != nil {
return err
}
@@ -159,7 +159,7 @@ func (w *MuxWriter) run(logger logger.Service) error {
case useDict := <-w.useDictChan:
err := w.writeUseDictionary(useDict)
if err != nil {
logger.Errorf("mux - write: error writing use dictionary: %s", err)
log.Error().Msgf("mux - write: error writing use dictionary: %s", err)
return err
}
w.idleTimer.MarkActive()
@@ -167,18 +167,18 @@ func (w *MuxWriter) run(logger logger.Service) error {
}
}
func (w *MuxWriter) writeStreamData(stream *MuxedStream, logger logger.Service) error {
logger.Debugf("mux - write: writable: streamID: %d", stream.streamID)
func (w *MuxWriter) writeStreamData(stream *MuxedStream, log *zerolog.Logger) error {
log.Debug().Msgf("mux - write: writable: streamID: %d", stream.streamID)
chunk := stream.getChunk()
w.metricsUpdater.updateReceiveWindow(stream.getReceiveWindow())
w.metricsUpdater.updateSendWindow(stream.getSendWindow())
if chunk.sendHeadersFrame() {
err := w.writeHeaders(chunk.streamID, chunk.headers)
if err != nil {
logger.Errorf("mux - write: error writing headers: %s: streamID: %d", err, stream.streamID)
log.Error().Msgf("mux - write: error writing headers: %s: streamID: %d", err, stream.streamID)
return err
}
logger.Debugf("mux - write: output headers: streamID: %d", stream.streamID)
log.Debug().Msgf("mux - write: output headers: streamID: %d", stream.streamID)
}
if chunk.sendWindowUpdateFrame() {
@@ -189,22 +189,22 @@ func (w *MuxWriter) writeStreamData(stream *MuxedStream, logger logger.Service)
// window, unless the receiver treats this as a connection error"
err := w.f.WriteWindowUpdate(chunk.streamID, chunk.windowUpdate)
if err != nil {
logger.Errorf("mux - write: error writing window update: %s: streamID: %d", err, stream.streamID)
log.Error().Msgf("mux - write: error writing window update: %s: streamID: %d", err, stream.streamID)
return err
}
logger.Debugf("mux - write: increment receive window by %d streamID: %d", chunk.windowUpdate, stream.streamID)
log.Debug().Msgf("mux - write: increment receive window by %d streamID: %d", chunk.windowUpdate, stream.streamID)
}
for chunk.sendDataFrame() {
payload, sentEOF := chunk.nextDataFrame(int(w.maxFrameSize))
err := w.f.WriteData(chunk.streamID, sentEOF, payload)
if err != nil {
logger.Errorf("mux - write: error writing data: %s: streamID: %d", err, stream.streamID)
log.Error().Msgf("mux - write: error writing data: %s: streamID: %d", err, stream.streamID)
return err
}
// update the amount of data wrote
w.bytesWrote.IncrementBy(uint64(len(payload)))
logger.Debugf("mux - write: output data: %d: streamID: %d", len(payload), stream.streamID)
log.Debug().Msgf("mux - write: output data: %d: streamID: %d", len(payload), stream.streamID)
if sentEOF {
if stream.readBuffer.Closed() {
@@ -212,15 +212,15 @@ func (w *MuxWriter) writeStreamData(stream *MuxedStream, logger logger.Service)
if !stream.gotReceiveEOF() {
// the peer may send data that we no longer want to receive. Force them into the
// closed state.
logger.Debugf("mux - write: resetting stream: streamID: %d", stream.streamID)
log.Debug().Msgf("mux - write: resetting stream: streamID: %d", stream.streamID)
w.f.WriteRSTStream(chunk.streamID, http2.ErrCodeNo)
} else {
// Half-open stream transitioned into closed
logger.Debugf("mux - write: closing stream: streamID: %d", stream.streamID)
log.Debug().Msgf("mux - write: closing stream: streamID: %d", stream.streamID)
}
w.streams.Delete(chunk.streamID)
} else {
logger.Debugf("mux - write: closing stream write side: streamID: %d", stream.streamID)
log.Debug().Msgf("mux - write: closing stream write side: streamID: %d", stream.streamID)
}
}
}