mirror of
https://github.com/cloudflare/cloudflared.git
synced 2025-07-27 00:29:58 +00:00
AUTH-2596 added new logger package and replaced logrus
This commit is contained in:
@@ -7,8 +7,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cloudflare/cloudflared/logger"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/http2/hpack"
|
||||
"golang.org/x/sync/errgroup"
|
||||
@@ -48,7 +48,7 @@ type MuxerConfig struct {
|
||||
// The minimum number of heartbeats to send before terminating the connection.
|
||||
MaxHeartbeats uint64
|
||||
// Logger to use
|
||||
Logger *log.Entry
|
||||
Logger logger.Service
|
||||
CompressionQuality CompressionSetting
|
||||
// Initial size for HTTP2 flow control windows
|
||||
DefaultWindowSize uint32
|
||||
@@ -136,10 +136,10 @@ func Handshake(
|
||||
handshakeSetting := http2.Setting{ID: SettingMuxerMagic, Val: MuxerMagicEdge}
|
||||
compressionSetting := http2.Setting{ID: SettingCompression, Val: config.CompressionQuality.toH2Setting()}
|
||||
if CompressionIsSupported() {
|
||||
log.Debug("Compression is supported")
|
||||
config.Logger.Debug("muxer: Compression is supported")
|
||||
m.compressionQuality = config.CompressionQuality.getPreset()
|
||||
} else {
|
||||
log.Debug("Compression is not supported")
|
||||
config.Logger.Debug("muxer: Compression is not supported")
|
||||
compressionSetting = http2.Setting{ID: SettingCompression, Val: 0}
|
||||
}
|
||||
|
||||
@@ -176,12 +176,12 @@ func Handshake(
|
||||
// Sanity check to enusre idelDuration is sane
|
||||
if idleDuration == 0 || idleDuration < defaultTimeout {
|
||||
idleDuration = defaultTimeout
|
||||
config.Logger.Warn("Minimum idle time has been adjusted to ", defaultTimeout)
|
||||
config.Logger.Infof("muxer: Minimum idle time has been adjusted to %d", defaultTimeout)
|
||||
}
|
||||
maxRetries := config.MaxHeartbeats
|
||||
if maxRetries == 0 {
|
||||
maxRetries = defaultRetries
|
||||
config.Logger.Warn("Minimum number of unacked heartbeats to send before closing the connection has been adjusted to ", maxRetries)
|
||||
config.Logger.Infof("muxer: Minimum number of unacked heartbeats to send before closing the connection has been adjusted to %d", maxRetries)
|
||||
}
|
||||
|
||||
compBytesBefore, compBytesAfter := NewAtomicCounter(0), NewAtomicCounter(0)
|
||||
|
@@ -15,8 +15,8 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cloudflare/cloudflared/logger"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
@@ -28,7 +28,7 @@ const (
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
if os.Getenv("VERBOSE") == "1" {
|
||||
log.SetLevel(log.DebugLevel)
|
||||
//TODO: set log level
|
||||
}
|
||||
os.Exit(m.Run())
|
||||
}
|
||||
@@ -51,7 +51,7 @@ func NewDefaultMuxerPair(t assert.TestingT, testName string, f MuxedStreamFunc)
|
||||
Handler: f,
|
||||
IsClient: true,
|
||||
Name: "origin",
|
||||
Logger: log.NewEntry(log.New()),
|
||||
Logger: logger.NewOutputWriter(logger.NewMockWriteManager()),
|
||||
DefaultWindowSize: (1 << 8) - 1,
|
||||
MaxWindowSize: (1 << 15) - 1,
|
||||
StreamWriteBufferMaxLen: 1024,
|
||||
@@ -63,7 +63,7 @@ func NewDefaultMuxerPair(t assert.TestingT, testName string, f MuxedStreamFunc)
|
||||
Timeout: testHandshakeTimeout,
|
||||
IsClient: false,
|
||||
Name: "edge",
|
||||
Logger: log.NewEntry(log.New()),
|
||||
Logger: logger.NewOutputWriter(logger.NewMockWriteManager()),
|
||||
DefaultWindowSize: (1 << 8) - 1,
|
||||
MaxWindowSize: (1 << 15) - 1,
|
||||
StreamWriteBufferMaxLen: 1024,
|
||||
@@ -86,7 +86,7 @@ func NewCompressedMuxerPair(t assert.TestingT, testName string, quality Compress
|
||||
IsClient: true,
|
||||
Name: "origin",
|
||||
CompressionQuality: quality,
|
||||
Logger: log.NewEntry(log.New()),
|
||||
Logger: logger.NewOutputWriter(logger.NewMockWriteManager()),
|
||||
HeartbeatInterval: defaultTimeout,
|
||||
MaxHeartbeats: defaultRetries,
|
||||
},
|
||||
@@ -96,7 +96,7 @@ func NewCompressedMuxerPair(t assert.TestingT, testName string, quality Compress
|
||||
IsClient: false,
|
||||
Name: "edge",
|
||||
CompressionQuality: quality,
|
||||
Logger: log.NewEntry(log.New()),
|
||||
Logger: logger.NewOutputWriter(logger.NewMockWriteManager()),
|
||||
HeartbeatInterval: defaultTimeout,
|
||||
MaxHeartbeats: defaultRetries,
|
||||
},
|
||||
@@ -301,6 +301,7 @@ func TestSingleStreamLargeResponseBody(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMultipleStreams(t *testing.T) {
|
||||
l := logger.NewOutputWriter(logger.NewMockWriteManager())
|
||||
f := MuxedStreamFunc(func(stream *MuxedStream) error {
|
||||
if len(stream.Headers) != 1 {
|
||||
t.Fatalf("expected %d headers, got %d", 1, len(stream.Headers))
|
||||
@@ -308,13 +309,13 @@ func TestMultipleStreams(t *testing.T) {
|
||||
if stream.Headers[0].Name != "client-token" {
|
||||
t.Fatalf("expected header name %s, got %s", "client-token", stream.Headers[0].Name)
|
||||
}
|
||||
log.Debugf("Got request for stream %s", stream.Headers[0].Value)
|
||||
l.Debugf("Got request for stream %s", stream.Headers[0].Value)
|
||||
stream.WriteHeaders([]Header{
|
||||
{Name: "response-token", Value: stream.Headers[0].Value},
|
||||
})
|
||||
log.Debugf("Wrote headers for stream %s", stream.Headers[0].Value)
|
||||
l.Debugf("Wrote headers for stream %s", stream.Headers[0].Value)
|
||||
stream.Write([]byte("OK"))
|
||||
log.Debugf("Wrote body for stream %s", stream.Headers[0].Value)
|
||||
l.Debugf("Wrote body for stream %s", stream.Headers[0].Value)
|
||||
return nil
|
||||
})
|
||||
muxPair := NewDefaultMuxerPair(t, t.Name(), f)
|
||||
@@ -332,7 +333,7 @@ func TestMultipleStreams(t *testing.T) {
|
||||
[]Header{{Name: "client-token", Value: tokenString}},
|
||||
nil,
|
||||
)
|
||||
log.Debugf("Got headers for stream %d", tokenId)
|
||||
l.Debugf("Got headers for stream %d", tokenId)
|
||||
if err != nil {
|
||||
errorsC <- err
|
||||
return
|
||||
@@ -370,7 +371,7 @@ func TestMultipleStreams(t *testing.T) {
|
||||
testFail := false
|
||||
for err := range errorsC {
|
||||
testFail = true
|
||||
log.Error(err)
|
||||
l.Errorf("%s", err)
|
||||
}
|
||||
if testFail {
|
||||
t.Fatalf("TestMultipleStreams failed")
|
||||
@@ -448,6 +449,8 @@ func TestMultipleStreamsFlowControl(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGracefulShutdown(t *testing.T) {
|
||||
l := logger.NewOutputWriter(logger.NewMockWriteManager())
|
||||
|
||||
sendC := make(chan struct{})
|
||||
responseBuf := bytes.Repeat([]byte("Hello world"), 65536)
|
||||
|
||||
@@ -456,17 +459,17 @@ func TestGracefulShutdown(t *testing.T) {
|
||||
{Name: "response-header", Value: "responseValue"},
|
||||
})
|
||||
<-sendC
|
||||
log.Debugf("Writing %d bytes", len(responseBuf))
|
||||
l.Debugf("Writing %d bytes", len(responseBuf))
|
||||
stream.Write(responseBuf)
|
||||
stream.CloseWrite()
|
||||
log.Debugf("Wrote %d bytes", len(responseBuf))
|
||||
l.Debugf("Wrote %d bytes", len(responseBuf))
|
||||
// Reading from the stream will block until the edge closes its end of the stream.
|
||||
// Otherwise, we'll close the whole connection before receiving the 'stream closed'
|
||||
// message from the edge.
|
||||
// Graceful shutdown works if you omit this, it just gives spurious errors for now -
|
||||
// TODO ignore errors when writing 'stream closed' and we're shutting down.
|
||||
stream.Read([]byte{0})
|
||||
log.Debugf("Handler ends")
|
||||
l.Debugf("Handler ends")
|
||||
return nil
|
||||
})
|
||||
muxPair := NewDefaultMuxerPair(t, t.Name(), f)
|
||||
@@ -483,7 +486,7 @@ func TestGracefulShutdown(t *testing.T) {
|
||||
muxPair.EdgeMux.Shutdown()
|
||||
close(sendC)
|
||||
responseBody := make([]byte, len(responseBuf))
|
||||
log.Debugf("Waiting for %d bytes", len(responseBuf))
|
||||
l.Debugf("Waiting for %d bytes", len(responseBuf))
|
||||
n, err := io.ReadFull(stream, responseBody)
|
||||
if err != nil {
|
||||
t.Fatalf("error from (*MuxedStream).Read with %d bytes read: %s", n, err)
|
||||
@@ -676,6 +679,7 @@ func AssertIfPipeReadable(t *testing.T, pipe io.ReadCloser) {
|
||||
}
|
||||
|
||||
func TestMultipleStreamsWithDictionaries(t *testing.T) {
|
||||
l := logger.NewOutputWriter(logger.NewMockWriteManager())
|
||||
|
||||
for q := CompressionNone; q <= CompressionMax; q++ {
|
||||
htmlBody := `<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"` +
|
||||
@@ -812,7 +816,7 @@ func TestMultipleStreamsWithDictionaries(t *testing.T) {
|
||||
testFail := false
|
||||
for err := range errorsC {
|
||||
testFail = true
|
||||
log.Error(err)
|
||||
l.Errorf("%s", err)
|
||||
}
|
||||
if testFail {
|
||||
t.Fatalf("TestMultipleStreams failed")
|
||||
@@ -826,6 +830,8 @@ func TestMultipleStreamsWithDictionaries(t *testing.T) {
|
||||
}
|
||||
|
||||
func sampleSiteHandler(files map[string][]byte) MuxedStreamFunc {
|
||||
l := logger.NewOutputWriter(logger.NewMockWriteManager())
|
||||
|
||||
return func(stream *MuxedStream) error {
|
||||
var contentType string
|
||||
var pathHeader Header
|
||||
@@ -853,13 +859,13 @@ func sampleSiteHandler(files map[string][]byte) MuxedStreamFunc {
|
||||
stream.WriteHeaders([]Header{
|
||||
Header{Name: "content-type", Value: contentType},
|
||||
})
|
||||
log.Debugf("Wrote headers for stream %s", pathHeader.Value)
|
||||
l.Debugf("Wrote headers for stream %s", pathHeader.Value)
|
||||
file, ok := files[pathHeader.Value]
|
||||
if !ok {
|
||||
return fmt.Errorf("%s content is not preloaded", pathHeader.Value)
|
||||
}
|
||||
stream.Write(file)
|
||||
log.Debugf("Wrote body for stream %s", pathHeader.Value)
|
||||
l.Debugf("Wrote body for stream %s", pathHeader.Value)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@@ -4,10 +4,9 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cloudflare/cloudflared/logger"
|
||||
"github.com/golang-collections/collections/queue"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// data points used to compute average receive window and send window size
|
||||
@@ -22,7 +21,7 @@ type muxMetricsUpdater interface {
|
||||
// metrics returns the latest metrics
|
||||
metrics() *MuxerMetrics
|
||||
// run is a blocking call to start the event loop
|
||||
run(logger *log.Entry) error
|
||||
run(logger logger.Service) error
|
||||
// updateRTTChan is called by muxReader to report new RTT measurements
|
||||
updateRTT(rtt *roundTripMeasurement)
|
||||
//updateReceiveWindowChan is called by muxReader and muxWriter when receiveWindow size is updated
|
||||
@@ -139,34 +138,30 @@ func (updater *muxMetricsUpdaterImpl) metrics() *MuxerMetrics {
|
||||
return m
|
||||
}
|
||||
|
||||
func (updater *muxMetricsUpdaterImpl) run(parentLogger *log.Entry) error {
|
||||
logger := parentLogger.WithFields(log.Fields{
|
||||
"subsystem": "mux",
|
||||
"dir": "metrics",
|
||||
})
|
||||
defer logger.Debug("event loop finished")
|
||||
func (updater *muxMetricsUpdaterImpl) run(logger logger.Service) error {
|
||||
defer logger.Debug("mux - metrics: event loop finished")
|
||||
for {
|
||||
select {
|
||||
case <-updater.abortChan:
|
||||
logger.Infof("Stopping mux metrics updater")
|
||||
logger.Infof("mux - metrics: Stopping mux metrics updater")
|
||||
return nil
|
||||
case roundTripMeasurement := <-updater.updateRTTChan:
|
||||
go updater.rttData.update(roundTripMeasurement)
|
||||
logger.Debug("Update rtt")
|
||||
logger.Debug("mux - metrics: Update rtt")
|
||||
case receiveWindow := <-updater.updateReceiveWindowChan:
|
||||
go updater.receiveWindowData.update(receiveWindow)
|
||||
logger.Debug("Update receive window")
|
||||
logger.Debug("mux - metrics: Update receive window")
|
||||
case sendWindow := <-updater.updateSendWindowChan:
|
||||
go updater.sendWindowData.update(sendWindow)
|
||||
logger.Debug("Update send window")
|
||||
logger.Debug("mux - metrics: Update send window")
|
||||
case inBoundBytes := <-updater.updateInBoundBytesChan:
|
||||
// inBoundBytes is bytes/sec because the update interval is 1 sec
|
||||
go updater.inBoundRate.update(inBoundBytes)
|
||||
logger.Debugf("Inbound bytes %d", inBoundBytes)
|
||||
logger.Debugf("mux - metrics: Inbound bytes %d", inBoundBytes)
|
||||
case outBoundBytes := <-updater.updateOutBoundBytesChan:
|
||||
// outBoundBytes is bytes/sec because the update interval is 1 sec
|
||||
go updater.outBoundRate.update(outBoundBytes)
|
||||
logger.Debugf("Outbound bytes %d", outBoundBytes)
|
||||
logger.Debugf("mux - metrics: Outbound bytes %d", outBoundBytes)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -5,7 +5,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/cloudflare/cloudflared/logger"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
@@ -91,7 +91,7 @@ func TestMuxMetricsUpdater(t *testing.T) {
|
||||
abortChan := make(chan struct{})
|
||||
compBefore, compAfter := NewAtomicCounter(0), NewAtomicCounter(0)
|
||||
m := newMuxMetricsUpdater(abortChan, compBefore, compAfter)
|
||||
logger := log.NewEntry(log.New())
|
||||
logger := logger.NewOutputWriter(logger.NewMockWriteManager())
|
||||
|
||||
go func() {
|
||||
errChan <- m.run(logger)
|
||||
|
@@ -3,11 +3,12 @@ package h2mux
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/cloudflare/cloudflared/logger"
|
||||
"golang.org/x/net/http2"
|
||||
)
|
||||
|
||||
@@ -67,12 +68,8 @@ func (r *MuxReader) Shutdown() <-chan struct{} {
|
||||
return done
|
||||
}
|
||||
|
||||
func (r *MuxReader) run(parentLogger *log.Entry) error {
|
||||
logger := parentLogger.WithFields(log.Fields{
|
||||
"subsystem": "mux",
|
||||
"dir": "read",
|
||||
})
|
||||
defer logger.Debug("event loop finished")
|
||||
func (r *MuxReader) run(logger logger.Service) error {
|
||||
defer logger.Debug("mux - read: event loop finished")
|
||||
|
||||
// routine to periodically update bytesRead
|
||||
go func() {
|
||||
@@ -90,13 +87,13 @@ func (r *MuxReader) run(parentLogger *log.Entry) error {
|
||||
for {
|
||||
frame, err := r.f.ReadFrame()
|
||||
if err != nil {
|
||||
errLogger := logger.WithError(err)
|
||||
errorString := fmt.Sprintf("mux - read: %s", err)
|
||||
if errorDetail := r.f.ErrorDetail(); errorDetail != nil {
|
||||
errLogger = errLogger.WithField("errorDetail", errorDetail)
|
||||
errorString = fmt.Sprintf("%s: errorDetail: %s", errorString, errorDetail)
|
||||
}
|
||||
switch e := err.(type) {
|
||||
case http2.StreamError:
|
||||
errLogger.Warn("stream error")
|
||||
logger.Infof("%s: stream error", errorString)
|
||||
// Ideally we wouldn't return here, since that aborts the muxer.
|
||||
// We should communicate the error to the relevant MuxedStream
|
||||
// data structure, so that callers of MuxedStream.Read() and
|
||||
@@ -104,25 +101,25 @@ func (r *MuxReader) run(parentLogger *log.Entry) error {
|
||||
// and keep the muxer going.
|
||||
return r.streamError(e.StreamID, e.Code)
|
||||
case http2.ConnectionError:
|
||||
errLogger.Warn("connection error")
|
||||
logger.Infof("%s: stream error", errorString)
|
||||
return r.connectionError(err)
|
||||
default:
|
||||
if isConnectionClosedError(err) {
|
||||
if r.streams.Len() == 0 {
|
||||
// don't log the error here -- that would just be extra noise
|
||||
logger.Debug("shutting down")
|
||||
logger.Debug("mux - read: shutting down")
|
||||
return nil
|
||||
}
|
||||
errLogger.Warn("connection closed unexpectedly")
|
||||
logger.Infof("%s: connection closed unexpectedly", errorString)
|
||||
return err
|
||||
} else {
|
||||
errLogger.Warn("frame read error")
|
||||
logger.Infof("%s: frame read error", errorString)
|
||||
return r.connectionError(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
r.connActive.Signal()
|
||||
logger.WithField("data", frame).Debug("read frame")
|
||||
logger.Debugf("mux - read: read frame: data %v", frame)
|
||||
switch f := frame.(type) {
|
||||
case *http2.DataFrame:
|
||||
err = r.receiveFrameData(f, logger)
|
||||
@@ -158,7 +155,7 @@ func (r *MuxReader) run(parentLogger *log.Entry) error {
|
||||
err = ErrUnexpectedFrameType
|
||||
}
|
||||
if err != nil {
|
||||
logger.WithField("data", frame).WithError(err).Debug("frame error")
|
||||
logger.Debugf("mux - read: read error: data %v", frame)
|
||||
return r.connectionError(err)
|
||||
}
|
||||
}
|
||||
@@ -279,8 +276,7 @@ func (r *MuxReader) handleStream(stream *MuxedStream) {
|
||||
}
|
||||
|
||||
// Receives a data frame from a stream. A non-nil error is a connection error.
|
||||
func (r *MuxReader) receiveFrameData(frame *http2.DataFrame, parentLogger *log.Entry) error {
|
||||
logger := parentLogger.WithField("stream", frame.Header().StreamID)
|
||||
func (r *MuxReader) receiveFrameData(frame *http2.DataFrame, logger logger.Service) error {
|
||||
stream, err := r.getStreamForFrame(frame)
|
||||
if err != nil {
|
||||
return r.defaultStreamErrorHandler(err, frame.Header())
|
||||
@@ -296,9 +292,9 @@ func (r *MuxReader) receiveFrameData(frame *http2.DataFrame, parentLogger *log.E
|
||||
if frame.Header().Flags.Has(http2.FlagDataEndStream) {
|
||||
if stream.receiveEOF() {
|
||||
r.streams.Delete(stream.streamID)
|
||||
logger.Debug("stream closed")
|
||||
logger.Debugf("mux - read: stream closed: streamID: %d", frame.Header().StreamID)
|
||||
} else {
|
||||
logger.Debug("shutdown receive side")
|
||||
logger.Debugf("mux - read: shutdown receive side: streamID: %d", frame.Header().StreamID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@@ -6,7 +6,7 @@ import (
|
||||
"io"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/cloudflare/cloudflared/logger"
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/http2/hpack"
|
||||
)
|
||||
@@ -72,12 +72,8 @@ func tsToPingData(ts int64) [8]byte {
|
||||
return pingData
|
||||
}
|
||||
|
||||
func (w *MuxWriter) run(parentLogger *log.Entry) error {
|
||||
logger := parentLogger.WithFields(log.Fields{
|
||||
"subsystem": "mux",
|
||||
"dir": "write",
|
||||
})
|
||||
defer logger.Debug("event loop finished")
|
||||
func (w *MuxWriter) run(logger logger.Service) error {
|
||||
defer logger.Debug("mux - write: event loop finished")
|
||||
|
||||
// routine to periodically communicate bytesWrote
|
||||
go func() {
|
||||
@@ -95,17 +91,17 @@ func (w *MuxWriter) run(parentLogger *log.Entry) error {
|
||||
for {
|
||||
select {
|
||||
case <-w.abortChan:
|
||||
logger.Debug("aborting writer thread")
|
||||
logger.Debug("mux - write: aborting writer thread")
|
||||
return nil
|
||||
case errCode := <-w.goAwayChan:
|
||||
logger.Debug("sending GOAWAY code ", errCode)
|
||||
logger.Debugf("mux - write: sending GOAWAY code %v", errCode)
|
||||
err := w.f.WriteGoAway(w.streams.LastPeerStreamID(), errCode, []byte{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.idleTimer.MarkActive()
|
||||
case <-w.pingTimestamp.GetUpdateChan():
|
||||
logger.Debug("sending PING ACK")
|
||||
logger.Debug("mux - write: sending PING ACK")
|
||||
err := w.f.WritePing(true, tsToPingData(w.pingTimestamp.Get()))
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -115,7 +111,7 @@ func (w *MuxWriter) run(parentLogger *log.Entry) error {
|
||||
if !w.idleTimer.Retry() {
|
||||
return ErrConnectionDropped
|
||||
}
|
||||
logger.Debug("sending PING")
|
||||
logger.Debug("mux - write: sending PING")
|
||||
err := w.f.WritePing(false, tsToPingData(time.Now().UnixNano()))
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -125,7 +121,7 @@ func (w *MuxWriter) run(parentLogger *log.Entry) error {
|
||||
w.idleTimer.MarkActive()
|
||||
case <-w.streamErrors.GetSignalChan():
|
||||
for streamID, errCode := range w.streamErrors.GetErrors() {
|
||||
logger.WithField("stream", streamID).WithField("code", errCode).Debug("resetting stream")
|
||||
logger.Debugf("mux - write: resetting stream with code: %v streamID: %d", errCode, streamID)
|
||||
err := w.f.WriteRSTStream(streamID, errCode)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -145,19 +141,17 @@ func (w *MuxWriter) run(parentLogger *log.Entry) error {
|
||||
if streamRequest.body != nil {
|
||||
go streamRequest.flushBody()
|
||||
}
|
||||
streamLogger := logger.WithField("stream", streamID)
|
||||
err := w.writeStreamData(streamRequest.stream, streamLogger)
|
||||
err := w.writeStreamData(streamRequest.stream, logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.idleTimer.MarkActive()
|
||||
case streamID := <-w.readyStreamChan:
|
||||
streamLogger := logger.WithField("stream", streamID)
|
||||
stream, ok := w.streams.Get(streamID)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
err := w.writeStreamData(stream, streamLogger)
|
||||
err := w.writeStreamData(stream, logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -165,7 +159,7 @@ func (w *MuxWriter) run(parentLogger *log.Entry) error {
|
||||
case useDict := <-w.useDictChan:
|
||||
err := w.writeUseDictionary(useDict)
|
||||
if err != nil {
|
||||
logger.WithError(err).Warn("error writing use dictionary")
|
||||
logger.Errorf("mux - write: error writing use dictionary: %s", err)
|
||||
return err
|
||||
}
|
||||
w.idleTimer.MarkActive()
|
||||
@@ -173,18 +167,18 @@ func (w *MuxWriter) run(parentLogger *log.Entry) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (w *MuxWriter) writeStreamData(stream *MuxedStream, logger *log.Entry) error {
|
||||
logger.Debug("writable")
|
||||
func (w *MuxWriter) writeStreamData(stream *MuxedStream, logger logger.Service) error {
|
||||
logger.Debugf("mux - write: writable: streamID: %d", stream.streamID)
|
||||
chunk := stream.getChunk()
|
||||
w.metricsUpdater.updateReceiveWindow(stream.getReceiveWindow())
|
||||
w.metricsUpdater.updateSendWindow(stream.getSendWindow())
|
||||
if chunk.sendHeadersFrame() {
|
||||
err := w.writeHeaders(chunk.streamID, chunk.headers)
|
||||
if err != nil {
|
||||
logger.WithError(err).Warn("error writing headers")
|
||||
logger.Errorf("mux - write: error writing headers: %s: streamID: %d", err, stream.streamID)
|
||||
return err
|
||||
}
|
||||
logger.Debug("output headers")
|
||||
logger.Debugf("mux - write: output headers: streamID: %d", stream.streamID)
|
||||
}
|
||||
|
||||
if chunk.sendWindowUpdateFrame() {
|
||||
@@ -195,22 +189,22 @@ func (w *MuxWriter) writeStreamData(stream *MuxedStream, logger *log.Entry) erro
|
||||
// window, unless the receiver treats this as a connection error"
|
||||
err := w.f.WriteWindowUpdate(chunk.streamID, chunk.windowUpdate)
|
||||
if err != nil {
|
||||
logger.WithError(err).Warn("error writing window update")
|
||||
logger.Errorf("mux - write: error writing window update: %s: streamID: %d", err, stream.streamID)
|
||||
return err
|
||||
}
|
||||
logger.Debugf("increment receive window by %d", chunk.windowUpdate)
|
||||
logger.Debugf("mux - write: increment receive window by %d streamID: %d", chunk.windowUpdate, stream.streamID)
|
||||
}
|
||||
|
||||
for chunk.sendDataFrame() {
|
||||
payload, sentEOF := chunk.nextDataFrame(int(w.maxFrameSize))
|
||||
err := w.f.WriteData(chunk.streamID, sentEOF, payload)
|
||||
if err != nil {
|
||||
logger.WithError(err).Warn("error writing data")
|
||||
logger.Errorf("mux - write: error writing data: %s: streamID: %d", err, stream.streamID)
|
||||
return err
|
||||
}
|
||||
// update the amount of data wrote
|
||||
w.bytesWrote.IncrementBy(uint64(len(payload)))
|
||||
logger.WithField("len", len(payload)).Debug("output data")
|
||||
logger.Errorf("mux - write: output data: %d: streamID: %d", len(payload), stream.streamID)
|
||||
|
||||
if sentEOF {
|
||||
if stream.readBuffer.Closed() {
|
||||
@@ -218,15 +212,15 @@ func (w *MuxWriter) writeStreamData(stream *MuxedStream, logger *log.Entry) erro
|
||||
if !stream.gotReceiveEOF() {
|
||||
// the peer may send data that we no longer want to receive. Force them into the
|
||||
// closed state.
|
||||
logger.Debug("resetting stream")
|
||||
logger.Debugf("mux - write: resetting stream: streamID: %d", stream.streamID)
|
||||
w.f.WriteRSTStream(chunk.streamID, http2.ErrCodeNo)
|
||||
} else {
|
||||
// Half-open stream transitioned into closed
|
||||
logger.Debug("closing stream")
|
||||
logger.Debugf("mux - write: closing stream: streamID: %d", stream.streamID)
|
||||
}
|
||||
w.streams.Delete(chunk.streamID)
|
||||
} else {
|
||||
logger.Debug("closing stream write side")
|
||||
logger.Debugf("mux - write: closing stream write side: streamID: %d", stream.streamID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user