TUN-2506: Expose active streams metrics

This commit is contained in:
Chung-Ting Huang
2019-11-05 14:37:40 -06:00
parent 3a2e12818f
commit 13bf65ce4e
62 changed files with 281 additions and 9308 deletions

View File

@@ -3,6 +3,7 @@ package h2mux
import (
"sync"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/http2"
)
@@ -22,13 +23,16 @@ type activeStreamMap struct {
// ignoreNewStreams is true when the connection is being shut down. New streams
// cannot be registered.
ignoreNewStreams bool
// activeStreams is a gauge shared by all muxers of this process to expose the total number of active streams
activeStreams prometheus.Gauge
}
func newActiveStreamMap(useClientStreamNumbers bool) *activeStreamMap {
func newActiveStreamMap(useClientStreamNumbers bool, activeStreams prometheus.Gauge) *activeStreamMap {
m := &activeStreamMap{
streams: make(map[uint32]*MuxedStream),
streamsEmpty: make(chan struct{}),
nextStreamID: 1,
streams: make(map[uint32]*MuxedStream),
streamsEmpty: make(chan struct{}),
nextStreamID: 1,
activeStreams: activeStreams,
}
// Client initiated stream uses odd stream ID, server initiated stream uses even stream ID
if !useClientStreamNumbers {
@@ -63,6 +67,7 @@ func (m *activeStreamMap) Set(newStream *MuxedStream) bool {
return false
}
m.streams[newStream.streamID] = newStream
m.activeStreams.Inc()
return true
}
@@ -70,7 +75,10 @@ func (m *activeStreamMap) Set(newStream *MuxedStream) bool {
func (m *activeStreamMap) Delete(streamID uint32) {
m.Lock()
defer m.Unlock()
delete(m.streams, streamID)
if _, ok := m.streams[streamID]; ok {
delete(m.streams, streamID)
m.activeStreams.Dec()
}
if len(m.streams) == 0 && m.streamsEmpty != nil {
close(m.streamsEmpty)
m.streamsEmpty = nil

View File

@@ -7,6 +7,7 @@ import (
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
@@ -107,6 +108,7 @@ func Handshake(
w io.WriteCloser,
r io.ReadCloser,
config MuxerConfig,
activeStreamsMetrics prometheus.Gauge,
) (*Muxer, error) {
// Set default config values
if config.Timeout == 0 {
@@ -130,7 +132,7 @@ func Handshake(
newStreamChan: make(chan MuxedStreamRequest),
abortChan: make(chan struct{}),
readyList: NewReadyList(),
streams: newActiveStreamMap(config.IsClient),
streams: newActiveStreamMap(config.IsClient, activeStreamsMetrics),
}
m.f.ReadMetaHeaders = hpack.NewDecoder(4096, func(hpack.HeaderField) {})

View File

@@ -43,7 +43,7 @@ type DefaultMuxerPair struct {
doneC chan struct{}
}
func NewDefaultMuxerPair(t assert.TestingT, f MuxedStreamFunc) *DefaultMuxerPair {
func NewDefaultMuxerPair(t assert.TestingT, testName string, f MuxedStreamFunc) *DefaultMuxerPair {
origin, edge := net.Pipe()
p := &DefaultMuxerPair{
OriginMuxConfig: MuxerConfig{
@@ -69,11 +69,11 @@ func NewDefaultMuxerPair(t assert.TestingT, f MuxedStreamFunc) *DefaultMuxerPair
EdgeConn: edge,
doneC: make(chan struct{}),
}
assert.NoError(t, p.Handshake())
assert.NoError(t, p.Handshake(testName))
return p
}
func NewCompressedMuxerPair(t assert.TestingT, quality CompressionSetting, f MuxedStreamFunc) *DefaultMuxerPair {
func NewCompressedMuxerPair(t assert.TestingT, testName string, quality CompressionSetting, f MuxedStreamFunc) *DefaultMuxerPair {
origin, edge := net.Pipe()
p := &DefaultMuxerPair{
OriginMuxConfig: MuxerConfig{
@@ -95,20 +95,20 @@ func NewCompressedMuxerPair(t assert.TestingT, quality CompressionSetting, f Mux
EdgeConn: edge,
doneC: make(chan struct{}),
}
assert.NoError(t, p.Handshake())
assert.NoError(t, p.Handshake(testName))
return p
}
func (p *DefaultMuxerPair) Handshake() error {
func (p *DefaultMuxerPair) Handshake(testName string) error {
ctx, cancel := context.WithTimeout(context.Background(), testHandshakeTimeout)
defer cancel()
errGroup, _ := errgroup.WithContext(ctx)
errGroup.Go(func() (err error) {
p.EdgeMux, err = Handshake(p.EdgeConn, p.EdgeConn, p.EdgeMuxConfig)
p.EdgeMux, err = Handshake(p.EdgeConn, p.EdgeConn, p.EdgeMuxConfig, NewActiveStreamsMetrics(testName, "edge"))
return errors.Wrap(err, "edge handshake failure")
})
errGroup.Go(func() (err error) {
p.OriginMux, err = Handshake(p.OriginConn, p.OriginConn, p.OriginMuxConfig)
p.OriginMux, err = Handshake(p.OriginConn, p.OriginConn, p.OriginMuxConfig, NewActiveStreamsMetrics(testName, "origin"))
return errors.Wrap(err, "origin handshake failure")
})
@@ -161,7 +161,7 @@ func TestHandshake(t *testing.T) {
f := func(stream *MuxedStream) error {
return nil
}
muxPair := NewDefaultMuxerPair(t, f)
muxPair := NewDefaultMuxerPair(t, t.Name(), f)
AssertIfPipeReadable(t, muxPair.OriginConn)
AssertIfPipeReadable(t, muxPair.EdgeConn)
}
@@ -191,7 +191,7 @@ func TestSingleStream(t *testing.T) {
}
return nil
})
muxPair := NewDefaultMuxerPair(t, f)
muxPair := NewDefaultMuxerPair(t, t.Name(), f)
muxPair.Serve(t)
stream, err := muxPair.OpenEdgeMuxStream(
@@ -262,7 +262,7 @@ func TestSingleStreamLargeResponseBody(t *testing.T) {
return nil
})
muxPair := NewDefaultMuxerPair(t, f)
muxPair := NewDefaultMuxerPair(t, t.Name(), f)
muxPair.Serve(t)
stream, err := muxPair.OpenEdgeMuxStream(
@@ -309,7 +309,7 @@ func TestMultipleStreams(t *testing.T) {
log.Debugf("Wrote body for stream %s", stream.Headers[0].Value)
return nil
})
muxPair := NewDefaultMuxerPair(t, f)
muxPair := NewDefaultMuxerPair(t, t.Name(), f)
muxPair.Serve(t)
maxStreams := 64
@@ -402,7 +402,7 @@ func TestMultipleStreamsFlowControl(t *testing.T) {
}
return nil
})
muxPair := NewDefaultMuxerPair(t, f)
muxPair := NewDefaultMuxerPair(t, t.Name(), f)
muxPair.Serve(t)
errGroup, _ := errgroup.WithContext(context.Background())
@@ -461,7 +461,7 @@ func TestGracefulShutdown(t *testing.T) {
log.Debugf("Handler ends")
return nil
})
muxPair := NewDefaultMuxerPair(t, f)
muxPair := NewDefaultMuxerPair(t, t.Name(), f)
muxPair.Serve(t)
stream, err := muxPair.OpenEdgeMuxStream(
@@ -516,7 +516,7 @@ func TestUnexpectedShutdown(t *testing.T) {
}
return nil
})
muxPair := NewDefaultMuxerPair(t, f)
muxPair := NewDefaultMuxerPair(t, t.Name(), f)
muxPair.Serve(t)
stream, err := muxPair.OpenEdgeMuxStream(
@@ -564,7 +564,7 @@ func EchoHandler(stream *MuxedStream) error {
func TestOpenAfterDisconnect(t *testing.T) {
for i := 0; i < 3; i++ {
muxPair := NewDefaultMuxerPair(t, EchoHandler)
muxPair := NewDefaultMuxerPair(t, fmt.Sprintf("%s_%d", t.Name(), i), EchoHandler)
muxPair.Serve(t)
switch i {
@@ -591,7 +591,7 @@ func TestOpenAfterDisconnect(t *testing.T) {
}
func TestHPACK(t *testing.T) {
muxPair := NewDefaultMuxerPair(t, EchoHandler)
muxPair := NewDefaultMuxerPair(t, t.Name(), EchoHandler)
muxPair.Serve(t)
stream, err := muxPair.OpenEdgeMuxStream(
@@ -724,7 +724,7 @@ func TestMultipleStreamsWithDictionaries(t *testing.T) {
return nil
})
muxPair := NewCompressedMuxerPair(t, q, f)
muxPair := NewCompressedMuxerPair(t, fmt.Sprintf("%s_%d", t.Name(), q), q, f)
muxPair.Serve(t)
var wg sync.WaitGroup
@@ -918,7 +918,7 @@ func TestSampleSiteWithDictionaries(t *testing.T) {
assert.NoError(t, err)
for q := CompressionNone; q <= CompressionMax; q++ {
muxPair := NewCompressedMuxerPair(t, q, sampleSiteHandler(files))
muxPair := NewCompressedMuxerPair(t, fmt.Sprintf("%s_%d", t.Name(), q), q, sampleSiteHandler(files))
muxPair.Serve(t)
var wg sync.WaitGroup
@@ -957,7 +957,7 @@ func TestLongSiteWithDictionaries(t *testing.T) {
files, err := loadSampleFiles(paths)
assert.NoError(t, err)
for q := CompressionNone; q <= CompressionMedium; q++ {
muxPair := NewCompressedMuxerPair(t, q, sampleSiteHandler(files))
muxPair := NewCompressedMuxerPair(t, fmt.Sprintf("%s_%d", t.Name(), q), q, sampleSiteHandler(files))
muxPair.Serve(t)
rand.Seed(time.Now().Unix())
@@ -998,7 +998,7 @@ func BenchmarkOpenStream(b *testing.B) {
})
return nil
})
muxPair := NewDefaultMuxerPair(b, f)
muxPair := NewDefaultMuxerPair(b, fmt.Sprintf("%s_%d", b.Name(), i), f)
muxPair.Serve(b)
b.StartTimer()
openStreams(b, muxPair, streams)

View File

@@ -5,6 +5,7 @@ import (
"time"
"github.com/golang-collections/collections/queue"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
)
@@ -299,3 +300,14 @@ func (r *rate) get() (curr, min, max uint64) {
defer r.lock.RUnlock()
return r.curr, r.min, r.max
}
func NewActiveStreamsMetrics(namespace, subsystem string) prometheus.Gauge {
activeStreams := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "active_streams",
Help: "Number of active streams created by all muxers.",
})
prometheus.MustRegister(activeStreams)
return activeStreams
}

View File

@@ -59,7 +59,7 @@ func assertOpenStreamSucceed(t *testing.T, stream *MuxedStream, err error) {
func TestMissingHeaders(t *testing.T) {
originHandler := &mockOriginStreamHandler{}
muxPair := NewDefaultMuxerPair(t, originHandler.ServeStream)
muxPair := NewDefaultMuxerPair(t, t.Name(), originHandler.ServeStream)
muxPair.Serve(t)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
@@ -83,7 +83,7 @@ func TestMissingHeaders(t *testing.T) {
func TestReceiveHeaderData(t *testing.T) {
originHandler := &mockOriginStreamHandler{}
muxPair := NewDefaultMuxerPair(t, originHandler.ServeStream)
muxPair := NewDefaultMuxerPair(t, t.Name(), originHandler.ServeStream)
muxPair.Serve(t)
reqHeaders := []Header{