AUTH-2105: Adds support for local forwarding. Refactor auditlogger creation.

AUTH-2088: Adds dynamic destination routing
This commit is contained in:
Michael Borkenstein
2019-10-02 15:56:28 -05:00
parent dbde3870da
commit 91d9dca34e
669 changed files with 74279 additions and 18300 deletions

View File

@@ -24,6 +24,7 @@
package channelz
import (
"fmt"
"sort"
"sync"
"sync/atomic"
@@ -95,9 +96,14 @@ func (d *dbWrapper) get() *channelMap {
// NewChannelzStorage initializes channelz data storage and id generator.
//
// This function returns a cleanup function to wait for all channelz state to be reset by the
// grpc goroutines when those entities get closed. By using this cleanup function, we make sure tests
// don't mess up each other, i.e. lingering goroutine from previous test doing entity removal happen
// to remove some entity just register by the new test, since the id space is the same.
//
// Note: This function is exported for testing purpose only. User should not call
// it in most cases.
func NewChannelzStorage() {
func NewChannelzStorage() (cleanup func() error) {
db.set(&channelMap{
topLevelChannels: make(map[int64]struct{}),
channels: make(map[int64]*channel),
@@ -107,6 +113,28 @@ func NewChannelzStorage() {
subChannels: make(map[int64]*subChannel),
})
idGen.reset()
return func() error {
var err error
cm := db.get()
if cm == nil {
return nil
}
for i := 0; i < 1000; i++ {
cm.mu.Lock()
if len(cm.topLevelChannels) == 0 && len(cm.servers) == 0 && len(cm.channels) == 0 && len(cm.subChannels) == 0 && len(cm.listenSockets) == 0 && len(cm.normalSockets) == 0 {
cm.mu.Unlock()
// all things stored in the channelz map have been cleared.
return nil
}
cm.mu.Unlock()
time.Sleep(10 * time.Millisecond)
}
cm.mu.Lock()
err = fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", len(cm.topLevelChannels), len(cm.servers), len(cm.channels), len(cm.subChannels), len(cm.listenSockets), len(cm.normalSockets))
cm.mu.Unlock()
return err
}
}
// GetTopChannels returns a slice of top channel's ChannelMetric, along with a

View File

@@ -25,40 +25,11 @@ import (
)
const (
prefix = "GRPC_GO_"
retryStr = prefix + "RETRY"
requireHandshakeStr = prefix + "REQUIRE_HANDSHAKE"
)
// RequireHandshakeSetting describes the settings for handshaking.
type RequireHandshakeSetting int
const (
// RequireHandshakeOn indicates to wait for handshake before considering a
// connection ready/successful.
RequireHandshakeOn RequireHandshakeSetting = iota
// RequireHandshakeOff indicates to not wait for handshake before
// considering a connection ready/successful.
RequireHandshakeOff
prefix = "GRPC_GO_"
retryStr = prefix + "RETRY"
)
var (
// Retry is set if retry is explicitly enabled via "GRPC_GO_RETRY=on".
Retry = strings.EqualFold(os.Getenv(retryStr), "on")
// RequireHandshake is set based upon the GRPC_GO_REQUIRE_HANDSHAKE
// environment variable.
//
// Will be removed after the 1.18 release.
RequireHandshake = RequireHandshakeOn
)
func init() {
switch strings.ToLower(os.Getenv(requireHandshakeStr)) {
case "on":
fallthrough
default:
RequireHandshake = RequireHandshakeOn
case "off":
RequireHandshake = RequireHandshakeOff
}
}

View File

@@ -23,6 +23,8 @@ package internal
import (
"context"
"time"
"google.golang.org/grpc/connectivity"
)
var (
@@ -37,10 +39,25 @@ var (
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by
// default, but tests may wish to set it lower for convenience.
KeepaliveMinPingTime = 10 * time.Second
// ParseServiceConfig is a function to parse JSON service configs into
// opaque data structures.
ParseServiceConfig func(sc string) (interface{}, error)
// StatusRawProto is exported by status/status.go. This func returns a
// pointer to the wrapped Status proto for a given status.Status without a
// call to proto.Clone(). The returned Status proto should not be mutated by
// the caller.
StatusRawProto interface{} // func (*status.Status) *spb.Status
)
// HealthChecker defines the signature of the client-side LB channel health checking function.
type HealthChecker func(ctx context.Context, newStream func() (interface{}, error), reportHealth func(bool), serviceName string) error
//
// The implementation is expected to create a health checking RPC stream by
// calling newStream(), watch for the health status of serviceName, and report
// it's health back by calling setConnectivityState().
//
// The health checking protocol is defined at:
// https://github.com/grpc/grpc/blob/master/doc/health-checking.md
type HealthChecker func(ctx context.Context, newStream func(string) (interface{}, error), setConnectivityState func(connectivity.State), serviceName string) error
const (
// CredsBundleModeFallback switches GoogleDefaultCreds to fallback mode.

View File

@@ -23,6 +23,7 @@ import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
@@ -84,24 +85,40 @@ func (il *itemList) isEmpty() bool {
// the control buffer of transport. They represent different aspects of
// control tasks, e.g., flow control, settings, streaming resetting, etc.
// maxQueuedTransportResponseFrames is the most queued "transport response"
// frames we will buffer before preventing new reads from occurring on the
// transport. These are control frames sent in response to client requests,
// such as RST_STREAM due to bad headers or settings acks.
const maxQueuedTransportResponseFrames = 50
type cbItem interface {
isTransportResponseFrame() bool
}
// registerStream is used to register an incoming stream with loopy writer.
type registerStream struct {
streamID uint32
wq *writeQuota
}
func (*registerStream) isTransportResponseFrame() bool { return false }
// headerFrame is also used to register stream on the client-side.
type headerFrame struct {
streamID uint32
hf []hpack.HeaderField
endStream bool // Valid on server side.
initStream func(uint32) (bool, error) // Used only on the client side.
endStream bool // Valid on server side.
initStream func(uint32) error // Used only on the client side.
onWrite func()
wq *writeQuota // write quota for the stream created.
cleanup *cleanupStream // Valid on the server side.
onOrphaned func(error) // Valid on client-side
}
func (h *headerFrame) isTransportResponseFrame() bool {
return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
}
type cleanupStream struct {
streamID uint32
rst bool
@@ -109,6 +126,8 @@ type cleanupStream struct {
onWrite func()
}
func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
type dataFrame struct {
streamID uint32
endStream bool
@@ -119,27 +138,41 @@ type dataFrame struct {
onEachWrite func()
}
func (*dataFrame) isTransportResponseFrame() bool { return false }
type incomingWindowUpdate struct {
streamID uint32
increment uint32
}
func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
type outgoingWindowUpdate struct {
streamID uint32
increment uint32
}
func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
return false // window updates are throttled by thresholds
}
type incomingSettings struct {
ss []http2.Setting
}
func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
type outgoingSettings struct {
ss []http2.Setting
}
func (*outgoingSettings) isTransportResponseFrame() bool { return false }
type incomingGoAway struct {
}
func (*incomingGoAway) isTransportResponseFrame() bool { return false }
type goAway struct {
code http2.ErrCode
debugData []byte
@@ -147,15 +180,21 @@ type goAway struct {
closeConn bool
}
func (*goAway) isTransportResponseFrame() bool { return false }
type ping struct {
ack bool
data [8]byte
}
func (*ping) isTransportResponseFrame() bool { return true }
type outFlowControlSizeRequest struct {
resp chan uint32
}
func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
type outStreamState int
const (
@@ -238,6 +277,14 @@ type controlBuffer struct {
consumerWaiting bool
list *itemList
err error
// transportResponseFrames counts the number of queued items that represent
// the response of an action initiated by the peer. trfChan is created
// when transportResponseFrames >= maxQueuedTransportResponseFrames and is
// closed and nilled when transportResponseFrames drops below the
// threshold. Both fields are protected by mu.
transportResponseFrames int
trfChan atomic.Value // *chan struct{}
}
func newControlBuffer(done <-chan struct{}) *controlBuffer {
@@ -248,12 +295,24 @@ func newControlBuffer(done <-chan struct{}) *controlBuffer {
}
}
func (c *controlBuffer) put(it interface{}) error {
// throttle blocks if there are too many incomingSettings/cleanupStreams in the
// controlbuf.
func (c *controlBuffer) throttle() {
ch, _ := c.trfChan.Load().(*chan struct{})
if ch != nil {
select {
case <-*ch:
case <-c.done:
}
}
}
func (c *controlBuffer) put(it cbItem) error {
_, err := c.executeAndPut(nil, it)
return err
}
func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{}) (bool, error) {
func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
var wakeUp bool
c.mu.Lock()
if c.err != nil {
@@ -271,6 +330,15 @@ func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{
c.consumerWaiting = false
}
c.list.enqueue(it)
if it.isTransportResponseFrame() {
c.transportResponseFrames++
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
// We are adding the frame that puts us over the threshold; create
// a throttling channel.
ch := make(chan struct{})
c.trfChan.Store(&ch)
}
}
c.mu.Unlock()
if wakeUp {
select {
@@ -304,7 +372,17 @@ func (c *controlBuffer) get(block bool) (interface{}, error) {
return nil, c.err
}
if !c.list.isEmpty() {
h := c.list.dequeue()
h := c.list.dequeue().(cbItem)
if h.isTransportResponseFrame() {
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
// We are removing the frame that put us over the
// threshold; close and clear the throttling channel.
ch := c.trfChan.Load().(*chan struct{})
close(*ch)
c.trfChan.Store((*chan struct{})(nil))
}
c.transportResponseFrames--
}
c.mu.Unlock()
return h, nil
}
@@ -559,21 +637,17 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
func (l *loopyWriter) originateStream(str *outStream) error {
hdr := str.itl.dequeue().(*headerFrame)
sendPing, err := hdr.initStream(str.id)
if err != nil {
if err := hdr.initStream(str.id); err != nil {
if err == ErrConnClosing {
return err
}
// Other errors(errStreamDrain) need not close transport.
return nil
}
if err = l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
return err
}
l.estdStreams[str.id] = str
if sendPing {
return l.pingHandler(&ping{data: [8]byte{}})
}
return nil
}

View File

@@ -149,6 +149,7 @@ func (f *inFlow) maybeAdjust(n uint32) uint32 {
n = uint32(math.MaxInt32)
}
f.mu.Lock()
defer f.mu.Unlock()
// estSenderQuota is the receiver's view of the maximum number of bytes the sender
// can send without a window update.
estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
@@ -169,10 +170,8 @@ func (f *inFlow) maybeAdjust(n uint32) uint32 {
// is padded; We will fallback on the current available window(at least a 1/4th of the limit).
f.delta = n
}
f.mu.Unlock()
return f.delta
}
f.mu.Unlock()
return 0
}

View File

@@ -24,6 +24,7 @@
package transport
import (
"bytes"
"context"
"errors"
"fmt"
@@ -347,7 +348,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
ht.stats.HandleRPC(s.ctx, inHeader)
}
s.trReader = &transportReader{
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf},
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},
windowHandler: func(int) {},
}
@@ -361,7 +362,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
for buf := make([]byte, readSize); ; {
n, err := req.Body.Read(buf)
if n > 0 {
s.buf.put(recvMsg{data: buf[:n:n]})
s.buf.put(recvMsg{buffer: bytes.NewBuffer(buf[:n:n])})
buf = buf[n:]
}
if err != nil {

View File

@@ -62,8 +62,6 @@ type http2Client struct {
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
// that the server sent GoAway on this transport.
goAway chan struct{}
// awakenKeepalive is used to wake up keepalive when after it has gone dormant.
awakenKeepalive chan struct{}
framer *framer
// controlBuf delivers all the control related tasks (e.g., window
@@ -110,6 +108,16 @@ type http2Client struct {
// goAwayReason records the http2.ErrCode and debug data received with the
// GoAway frame.
goAwayReason GoAwayReason
// A condition variable used to signal when the keepalive goroutine should
// go dormant. The condition for dormancy is based on the number of active
// streams and the `PermitWithoutStream` keepalive client parameter. And
// since the number of active streams is guarded by the above mutex, we use
// the same for this condition variable as well.
kpDormancyCond *sync.Cond
// A boolean to track whether the keepalive goroutine is dormant or not.
// This is checked before attempting to signal the above condition
// variable.
kpDormant bool
// Fields below are for channelz metric collection.
channelzID int64 // channelz unique identification number
@@ -117,6 +125,8 @@ type http2Client struct {
onGoAway func(GoAwayReason)
onClose func()
bufferPool *bufferPool
}
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
@@ -230,7 +240,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
goAway: make(chan struct{}),
awakenKeepalive: make(chan struct{}, 1),
framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
fc: &trInFlow{limit: uint32(icwz)},
scheme: scheme,
@@ -249,6 +258,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
onGoAway: onGoAway,
onClose: onClose,
keepaliveEnabled: keepaliveEnabled,
bufferPool: newBufferPool(),
}
t.controlBuf = newControlBuffer(t.ctxDone)
if opts.InitialWindowSize >= defaultWindowSize {
@@ -261,9 +271,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
updateFlowControl: t.updateFlowControl,
}
}
// Make sure awakenKeepalive can't be written upon.
// keepalive routine will make it writable, if need be.
t.awakenKeepalive <- struct{}{}
if t.statsHandler != nil {
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
@@ -278,6 +285,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
}
if t.keepaliveEnabled {
t.kpDormancyCond = sync.NewCond(&t.mu)
go t.keepalive()
}
// Start the reader goroutine for incoming message. Each transport has
@@ -367,6 +375,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
closeStream: func(err error) {
t.CloseStream(s, err)
},
freeBuffer: t.bufferPool.put,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
@@ -437,6 +446,15 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
var k string
for k, vv := range md {
// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
if isReservedHeader(k) {
continue
}
for _, v := range vv {
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
}
for _, vv := range added {
for i, v := range vv {
if i%2 == 0 {
@@ -450,15 +468,6 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
}
}
for k, vv := range md {
// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
if isReservedHeader(k) {
continue
}
for _, v := range vv {
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
}
}
if md, ok := t.md.(*metadata.MD); ok {
for k, vv := range *md {
@@ -489,6 +498,9 @@ func (t *http2Client) createAudience(callHdr *CallHdr) string {
}
func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
if len(t.perRPCCreds) == 0 {
return nil, nil
}
authData := map[string]string{}
for _, c := range t.perRPCCreds {
data, err := c.GetRequestMetadata(ctx, audience)
@@ -509,7 +521,7 @@ func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[s
}
func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
callAuthData := map[string]string{}
var callAuthData map[string]string
// Check if credentials.PerRPCCredentials were provided via call options.
// Note: if these credentials are provided both via dial options and call
// options, then both sets of credentials will be applied.
@@ -521,6 +533,7 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
if err != nil {
return nil, status.Errorf(codes.Internal, "transport: %v", err)
}
callAuthData = make(map[string]string, len(data))
for k, v := range data {
// Capital header names are illegal in HTTP/2
k = strings.ToLower(k)
@@ -549,15 +562,14 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
s.write(recvMsg{err: err})
close(s.done)
// If headerChan isn't closed, then close it.
if atomic.SwapUint32(&s.headerDone, 1) == 0 {
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
close(s.headerChan)
}
}
hdr := &headerFrame{
hf: headerFields,
endStream: false,
initStream: func(id uint32) (bool, error) {
initStream: func(id uint32) error {
t.mu.Lock()
if state := t.state; state != reachable {
t.mu.Unlock()
@@ -567,29 +579,19 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
err = ErrConnClosing
}
cleanup(err)
return false, err
return err
}
t.activeStreams[id] = s
if channelz.IsOn() {
atomic.AddInt64(&t.czData.streamsStarted, 1)
atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
}
var sendPing bool
// If the number of active streams change from 0 to 1, then check if keepalive
// has gone dormant. If so, wake it up.
if len(t.activeStreams) == 1 && t.keepaliveEnabled {
select {
case t.awakenKeepalive <- struct{}{}:
sendPing = true
// Fill the awakenKeepalive channel again as this channel must be
// kept non-writable except at the point that the keepalive()
// goroutine is waiting either to be awaken or shutdown.
t.awakenKeepalive <- struct{}{}
default:
}
// If the keepalive goroutine has gone dormant, wake it up.
if t.kpDormant {
t.kpDormancyCond.Signal()
}
t.mu.Unlock()
return sendPing, nil
return nil
},
onOrphaned: cleanup,
wq: s.wq,
@@ -713,7 +715,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
s.write(recvMsg{err: err})
}
// If headerChan isn't closed, then close it.
if atomic.SwapUint32(&s.headerDone, 1) == 0 {
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
s.noHeaders = true
close(s.headerChan)
}
@@ -765,9 +767,17 @@ func (t *http2Client) Close() error {
t.mu.Unlock()
return nil
}
// Call t.onClose before setting the state to closing to prevent the client
// from attempting to create new streams ASAP.
t.onClose()
t.state = closing
streams := t.activeStreams
t.activeStreams = nil
if t.kpDormant {
// If the keepalive goroutine is blocked on this condition variable, we
// should unblock it so that the goroutine eventually exits.
t.kpDormancyCond.Signal()
}
t.mu.Unlock()
t.controlBuf.finish()
t.cancel()
@@ -785,7 +795,6 @@ func (t *http2Client) Close() error {
}
t.statsHandler.HandleConn(t.ctx, connEnd)
}
t.onClose()
return err
}
@@ -794,21 +803,21 @@ func (t *http2Client) Close() error {
// stream is closed. If there are no active streams, the transport is closed
// immediately. This does nothing if the transport is already draining or
// closing.
func (t *http2Client) GracefulClose() error {
func (t *http2Client) GracefulClose() {
t.mu.Lock()
// Make sure we move to draining only from active.
if t.state == draining || t.state == closing {
t.mu.Unlock()
return nil
return
}
t.state = draining
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
return t.Close()
t.Close()
return
}
t.controlBuf.put(&incomingGoAway{})
return nil
}
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
@@ -844,11 +853,11 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
return t.controlBuf.put(df)
}
func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
func (t *http2Client) getStream(f http2.Frame) *Stream {
t.mu.Lock()
defer t.mu.Unlock()
s, ok := t.activeStreams[f.Header().StreamID]
return s, ok
s := t.activeStreams[f.Header().StreamID]
t.mu.Unlock()
return s
}
// adjustWindow sends out extra window update over the initial window size
@@ -928,8 +937,8 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
t.controlBuf.put(bdpPing)
}
// Select the right stream to dispatch.
s, ok := t.getStream(f)
if !ok {
s := t.getStream(f)
if s == nil {
return
}
if size > 0 {
@@ -946,9 +955,10 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
if len(f.Data()) > 0 {
data := make([]byte, len(f.Data()))
copy(data, f.Data())
s.write(recvMsg{data: data})
buffer := t.bufferPool.get()
buffer.Reset()
buffer.Write(f.Data())
s.write(recvMsg{buffer: buffer})
}
}
// The server has closed the stream without sending trailers. Record that
@@ -959,8 +969,8 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
}
func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
s, ok := t.getStream(f)
if !ok {
s := t.getStream(f)
if s == nil {
return
}
if f.ErrCode == http2.ErrCodeRefusedStream {
@@ -973,9 +983,9 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
statusCode = codes.Unknown
}
if statusCode == codes.Canceled {
// Our deadline was already exceeded, and that was likely the cause of
// this cancelation. Alter the status code accordingly.
if d, ok := s.ctx.Deadline(); ok && d.After(time.Now()) {
if d, ok := s.ctx.Deadline(); ok && !d.After(time.Now()) {
// Our deadline was already exceeded, and that was likely the cause
// of this cancelation. Alter the status code accordingly.
statusCode = codes.DeadlineExceeded
}
}
@@ -1080,11 +1090,12 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
default:
t.setGoAwayReason(f)
close(t.goAway)
t.state = draining
t.controlBuf.put(&incomingGoAway{})
// This has to be a new goroutine because we're still using the current goroutine to read in the transport.
// Notify the clientconn about the GOAWAY before we set the state to
// draining, to allow the client to stop attempting to create streams
// before disallowing new streams on this connection.
t.onGoAway(t.goAwayReason)
t.state = draining
}
// All streams with IDs greater than the GoAwayId
// and smaller than the previous GoAway ID should be killed.
@@ -1136,32 +1147,30 @@ func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
// operateHeaders takes action on the decoded headers.
func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
s, ok := t.getStream(frame)
if !ok {
s := t.getStream(frame)
if s == nil {
return
}
endStream := frame.StreamEnded()
atomic.StoreUint32(&s.bytesReceived, 1)
initialHeader := atomic.SwapUint32(&s.headerDone, 1) == 0
initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
if !initialHeader && !endStream {
// As specified by RFC 7540, a HEADERS frame (and associated CONTINUATION frames) can only appear
// at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
// As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
return
}
state := &decodeState{}
// Initialize isGRPC value to be !initialHeader, since if a gRPC ResponseHeader has been received
// which indicates peer speaking gRPC, we are in gRPC mode.
// Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
state.data.isGRPC = !initialHeader
if err := state.decodeHeader(frame); err != nil {
t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
return
}
var isHeader bool
isHeader := false
defer func() {
if t.statsHandler != nil {
if isHeader {
@@ -1180,10 +1189,10 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
}
}()
// If headers haven't been received yet.
if initialHeader {
// If headerChan hasn't been closed yet
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
if !endStream {
// Headers frame is ResponseHeader.
// HEADERS frame block carries a Response-Headers.
isHeader = true
// These values can be set without any synchronization because
// stream goroutine will read it only after seeing a closed
@@ -1192,14 +1201,17 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
if len(state.data.mdata) > 0 {
s.header = state.data.mdata
}
close(s.headerChan)
return
} else {
// HEADERS frame block carries a Trailers-Only.
s.noHeaders = true
}
// Headers frame is Trailers-only.
s.noHeaders = true
close(s.headerChan)
}
if !endStream {
return
}
// if client received END_STREAM from server while stream was still active, send RST_STREAM
rst := s.getState() == streamActive
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)
@@ -1233,6 +1245,7 @@ func (t *http2Client) reader() {
// loop to keep reading incoming messages on this transport.
for {
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
if t.keepaliveEnabled {
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
@@ -1290,29 +1303,32 @@ func (t *http2Client) keepalive() {
timer.Reset(t.kp.Time)
continue
}
// Check if keepalive should go dormant.
t.mu.Lock()
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
// Make awakenKeepalive writable.
<-t.awakenKeepalive
if t.state == closing {
// If the transport is closing, we should exit from the
// keepalive goroutine here. If not, we could have a race
// between the call to Signal() from Close() and the call to
// Wait() here, whereby the keepalive goroutine ends up
// blocking on the condition variable which will never be
// signalled again.
t.mu.Unlock()
select {
case <-t.awakenKeepalive:
// If the control gets here a ping has been sent
// need to reset the timer with keepalive.Timeout.
case <-t.ctx.Done():
return
}
} else {
t.mu.Unlock()
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
// Send ping.
t.controlBuf.put(p)
return
}
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
t.kpDormant = true
t.kpDormancyCond.Wait()
}
t.kpDormant = false
t.mu.Unlock()
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
// We get here either because we were dormant and a new stream was
// created which unblocked the Wait() call, or because the
// keepalive timer expired. In both cases, we need to send a ping.
t.controlBuf.put(p)
// By the time control gets here a ping has been sent one way or the other.
timer.Reset(t.kp.Timeout)
select {
case <-timer.C:
@@ -1320,6 +1336,7 @@ func (t *http2Client) keepalive() {
timer.Reset(t.kp.Time)
continue
}
infof("transport: closing client transport due to idleness.")
t.Close()
return
case <-t.ctx.Done():

View File

@@ -35,9 +35,11 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/keepalive"
@@ -55,13 +57,15 @@ var (
// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
// than the limit set by peer.
ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
// statusRawProto is a function to get to the raw status proto wrapped in a
// status.Status without a proto.Clone().
statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status)
)
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
ctx context.Context
ctxDone <-chan struct{} // Cache the context.Done() chan
cancel context.CancelFunc
done chan struct{}
conn net.Conn
loopy *loopyWriter
readerDone chan struct{} // sync point to enable testing.
@@ -119,6 +123,7 @@ type http2Server struct {
// Fields below are for channelz metric collection.
channelzID int64 // channelz unique identification number
czData *channelzData
bufferPool *bufferPool
}
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
@@ -132,7 +137,10 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
}
framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
// Send initial settings as connection preface to client.
var isettings []http2.Setting
isettings := []http2.Setting{{
ID: http2.SettingMaxFrameSize,
Val: http2MaxFrameLen,
}}
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
// permitted in the HTTP2 spec.
maxStreams := config.MaxStreams
@@ -197,11 +205,10 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
if kep.MinTime == 0 {
kep.MinTime = defaultKeepalivePolicyMinTime
}
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
t := &http2Server{
ctx: ctx,
cancel: cancel,
ctxDone: ctx.Done(),
ctx: context.Background(),
done: done,
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
@@ -220,8 +227,9 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
kep: kep,
initialWindowSize: iwz,
czData: new(channelzData),
bufferPool: newBufferPool(),
}
t.controlBuf = newControlBuffer(t.ctxDone)
t.controlBuf = newControlBuffer(t.done)
if dynamicWindow {
t.bdpEst = &bdpEstimator{
bdp: initialWindowSize,
@@ -352,12 +360,14 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
s.cancel()
return false
}
}
t.mu.Lock()
if t.state != reachable {
t.mu.Unlock()
s.cancel()
return false
}
if uint32(len(t.activeStreams)) >= t.maxStreams {
@@ -368,12 +378,14 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
s.cancel()
return false
}
if streamID%2 != 1 || streamID <= t.maxStreamID {
t.mu.Unlock()
// illegal gRPC stream id.
errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
s.cancel()
return true
}
t.maxStreamID = streamID
@@ -405,9 +417,10 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
s.trReader = &transportReader{
reader: &recvBufferReader{
ctx: s.ctx,
ctxDone: s.ctxDone,
recv: s.buf,
ctx: s.ctx,
ctxDone: s.ctxDone,
recv: s.buf,
freeBuffer: t.bufferPool.put,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
@@ -428,6 +441,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
defer close(t.readerDone)
for {
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
atomic.StoreUint32(&t.activity, 1)
if err != nil {
@@ -437,7 +451,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
s := t.activeStreams[se.StreamID]
t.mu.Unlock()
if s != nil {
t.closeStream(s, true, se.Code, nil, false)
t.closeStream(s, true, se.Code, false)
} else {
t.controlBuf.put(&cleanupStream{
streamID: se.StreamID,
@@ -579,7 +593,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
}
if size > 0 {
if err := s.fc.onData(size); err != nil {
t.closeStream(s, true, http2.ErrCodeFlowControl, nil, false)
t.closeStream(s, true, http2.ErrCodeFlowControl, false)
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
@@ -591,9 +605,10 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
if len(f.Data()) > 0 {
data := make([]byte, len(f.Data()))
copy(data, f.Data())
s.write(recvMsg{data: data})
buffer := t.bufferPool.get()
buffer.Reset()
buffer.Write(f.Data())
s.write(recvMsg{buffer: buffer})
}
}
if f.Header().Flags.Has(http2.FlagDataEndStream) {
@@ -604,11 +619,18 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
}
func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
s, ok := t.getStream(f)
if !ok {
// If the stream is not deleted from the transport's active streams map, then do a regular close stream.
if s, ok := t.getStream(f); ok {
t.closeStream(s, false, 0, false)
return
}
t.closeStream(s, false, 0, nil, false)
// If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
t.controlBuf.put(&cleanupStream{
streamID: f.Header().StreamID,
rst: false,
rstCode: 0,
onWrite: func() {},
})
}
func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
@@ -750,6 +772,10 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
return nil
}
func (t *http2Server) setResetPingStrikes() {
atomic.StoreUint32(&t.resetPingStrikes, 1)
}
func (t *http2Server) writeHeaderLocked(s *Stream) error {
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
@@ -764,15 +790,13 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {
streamID: s.id,
hf: headerFields,
endStream: false,
onWrite: func() {
atomic.StoreUint32(&t.resetPingStrikes, 1)
},
onWrite: t.setResetPingStrikes,
})
if !success {
if err != nil {
return err
}
t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
t.closeStream(s, true, http2.ErrCodeInternal, false)
return ErrHeaderListSizeLimitViolation
}
if t.stats != nil {
@@ -810,7 +834,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
if p := st.Proto(); p != nil && len(p.Details) > 0 {
if p := statusRawProto(st); p != nil && len(p.Details) > 0 {
stBytes, err := proto.Marshal(p)
if err != nil {
// TODO: return error instead, when callers are able to handle it.
@@ -826,9 +850,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
streamID: s.id,
hf: headerFields,
endStream: true,
onWrite: func() {
atomic.StoreUint32(&t.resetPingStrikes, 1)
},
onWrite: t.setResetPingStrikes,
}
s.hdrMu.Unlock()
success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
@@ -836,12 +858,12 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
if err != nil {
return err
}
t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
t.closeStream(s, true, http2.ErrCodeInternal, false)
return ErrHeaderListSizeLimitViolation
}
// Send a RST_STREAM after the trailers if the client has not already half-closed.
rst := s.getState() == streamActive
t.closeStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
if t.stats != nil {
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
}
@@ -865,7 +887,7 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
s.cancel()
select {
case <-t.ctx.Done():
case <-t.done:
return ErrConnClosing
default:
}
@@ -880,16 +902,14 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
hdr = append(hdr, data[:emptyLen]...)
data = data[emptyLen:]
df := &dataFrame{
streamID: s.id,
h: hdr,
d: data,
onEachWrite: func() {
atomic.StoreUint32(&t.resetPingStrikes, 1)
},
streamID: s.id,
h: hdr,
d: data,
onEachWrite: t.setResetPingStrikes,
}
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
select {
case <-t.ctx.Done():
case <-t.done:
return ErrConnClosing
default:
}
@@ -951,10 +971,11 @@ func (t *http2Server) keepalive() {
select {
case <-maxAge.C:
// Close the connection after grace period.
infof("transport: closing server transport due to maximum connection age.")
t.Close()
// Resetting the timer so that the clean-up doesn't deadlock.
maxAge.Reset(infinity)
case <-t.ctx.Done():
case <-t.done:
}
return
case <-keepalive.C:
@@ -964,6 +985,7 @@ func (t *http2Server) keepalive() {
continue
}
if pingSent {
infof("transport: closing server transport due to idleness.")
t.Close()
// Resetting the timer so that the clean-up doesn't deadlock.
keepalive.Reset(infinity)
@@ -975,7 +997,7 @@ func (t *http2Server) keepalive() {
}
t.controlBuf.put(p)
keepalive.Reset(t.kp.Timeout)
case <-t.ctx.Done():
case <-t.done:
return
}
}
@@ -995,7 +1017,7 @@ func (t *http2Server) Close() error {
t.activeStreams = nil
t.mu.Unlock()
t.controlBuf.finish()
t.cancel()
close(t.done)
err := t.conn.Close()
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
@@ -1013,15 +1035,17 @@ func (t *http2Server) Close() error {
// deleteStream deletes the stream s from transport's active streams.
func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
t.mu.Lock()
if _, ok := t.activeStreams[s.id]; !ok {
t.mu.Unlock()
return
}
// In case stream sending and receiving are invoked in separate
// goroutines (e.g., bi-directional streaming), cancel needs to be
// called to interrupt the potential blocking on other goroutines.
s.cancel()
delete(t.activeStreams, s.id)
if len(t.activeStreams) == 0 {
t.idle = time.Now()
t.mu.Lock()
if _, ok := t.activeStreams[s.id]; ok {
delete(t.activeStreams, s.id)
if len(t.activeStreams) == 0 {
t.idle = time.Now()
}
}
t.mu.Unlock()
@@ -1034,51 +1058,36 @@ func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
}
}
// closeStream clears the footprint of a stream when the stream is not needed
// any more.
func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
// Mark the stream as done
// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
oldState := s.swapState(streamDone)
if oldState == streamDone {
// If the stream was already done, return.
return
}
// In case stream sending and receiving are invoked in separate
// goroutines (e.g., bi-directional streaming), cancel needs to be
// called to interrupt the potential blocking on other goroutines.
s.cancel()
hdr.cleanup = &cleanupStream{
streamID: s.id,
rst: rst,
rstCode: rstCode,
onWrite: func() {
t.deleteStream(s, eosReceived)
},
}
t.controlBuf.put(hdr)
}
// Deletes the stream from active streams
// closeStream clears the footprint of a stream when the stream is not needed any more.
func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
s.swapState(streamDone)
t.deleteStream(s, eosReceived)
cleanup := &cleanupStream{
t.controlBuf.put(&cleanupStream{
streamID: s.id,
rst: rst,
rstCode: rstCode,
onWrite: func() {},
}
// No trailer. Puts cleanupFrame into transport's control buffer.
if hdr == nil {
t.controlBuf.put(cleanup)
return
}
// We do the check here, because of the following scenario:
// 1. closeStream is called first with a trailer. A trailer item with a piggybacked cleanup item
// is put to control buffer.
// 2. Loopy writer is waiting on a stream quota. It will never get it because client errored at
// some point. So loopy can't act on trailer
// 3. Client sends a RST_STREAM due to the error. Then closeStream is called without a trailer as
// the result of the received RST_STREAM.
// If we do this check at the beginning of the closeStream, then we won't put a cleanup item in
// response to received RST_STREAM into the control buffer and outStream in loopy writer will
// never get cleaned up.
// If the stream is already done, don't send the trailer.
if oldState == streamDone {
return
}
hdr.cleanup = cleanup
t.controlBuf.put(hdr)
})
}
func (t *http2Server) RemoteAddr() net.Addr {
@@ -1148,7 +1157,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
select {
case <-t.drainChan:
case <-timer.C:
case <-t.ctx.Done():
case <-t.done:
return
}
t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
@@ -1198,7 +1207,7 @@ func (t *http2Server) getOutFlowWindow() int64 {
select {
case sz := <-resp:
return int64(sz)
case <-t.ctxDone:
case <-t.done:
return -1
case <-timer.C:
return -2

View File

@@ -667,6 +667,7 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList
writer: w,
fr: http2.NewFramer(w, r),
}
f.fr.SetMaxReadFrameSize(http2MaxFrameLen)
// Opt-in to Frame reuse API on framer to reduce garbage.
// Frames aren't safe to read from after a subsequent call to ReadFrame.
f.fr.SetReuseFrames()

View File

@@ -22,6 +22,7 @@
package transport
import (
"bytes"
"context"
"errors"
"fmt"
@@ -39,10 +40,32 @@ import (
"google.golang.org/grpc/tap"
)
type bufferPool struct {
pool sync.Pool
}
func newBufferPool() *bufferPool {
return &bufferPool{
pool: sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
},
}
}
func (p *bufferPool) get() *bytes.Buffer {
return p.pool.Get().(*bytes.Buffer)
}
func (p *bufferPool) put(b *bytes.Buffer) {
p.pool.Put(b)
}
// recvMsg represents the received msg from the transport. All transport
// protocol specific info has been removed.
type recvMsg struct {
data []byte
buffer *bytes.Buffer
// nil: received some data
// io.EOF: stream is completed. data is nil.
// other non-nil error: transport failure. data is nil.
@@ -117,8 +140,9 @@ type recvBufferReader struct {
ctx context.Context
ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
recv *recvBuffer
last []byte // Stores the remaining data in the previous calls.
last *bytes.Buffer // Stores the remaining data in the previous calls.
err error
freeBuffer func(*bytes.Buffer)
}
// Read reads the next len(p) bytes from last. If last is drained, it tries to
@@ -128,10 +152,13 @@ func (r *recvBufferReader) Read(p []byte) (n int, err error) {
if r.err != nil {
return 0, r.err
}
if r.last != nil && len(r.last) > 0 {
if r.last != nil {
// Read remaining data left in last call.
copied := copy(p, r.last)
r.last = r.last[copied:]
copied, _ := r.last.Read(p)
if r.last.Len() == 0 {
r.freeBuffer(r.last)
r.last = nil
}
return copied, nil
}
if r.closeStream != nil {
@@ -157,6 +184,19 @@ func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
// r.readAdditional acts on that message and returns the necessary error.
select {
case <-r.ctxDone:
// Note that this adds the ctx error to the end of recv buffer, and
// reads from the head. This will delay the error until recv buffer is
// empty, thus will delay ctx cancellation in Recv().
//
// It's done this way to fix a race between ctx cancel and trailer. The
// race was, stream.Recv() may return ctx error if ctxDone wins the
// race, but stream.Trailer() may return a non-nil md because the stream
// was not marked as done when trailer is received. This closeStream
// call will mark stream as done, thus fix the race.
//
// TODO: delaying ctx error seems like a unnecessary side effect. What
// we really want is to mark the stream as done, and return ctx error
// faster.
r.closeStream(ContextErr(r.ctx.Err()))
m := <-r.recv.get()
return r.readAdditional(m, p)
@@ -170,8 +210,13 @@ func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error
if m.err != nil {
return 0, m.err
}
copied := copy(p, m.data)
r.last = m.data[copied:]
copied, _ := m.buffer.Read(p)
if m.buffer.Len() == 0 {
r.freeBuffer(m.buffer)
r.last = nil
} else {
r.last = m.buffer
}
return copied, nil
}
@@ -204,8 +249,8 @@ type Stream struct {
// is used to adjust flow control, if needed.
requestRead func(int)
headerChan chan struct{} // closed to indicate the end of header metadata.
headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
headerChan chan struct{} // closed to indicate the end of header metadata.
headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
// hdrMu protects header and trailer metadata on the server-side.
hdrMu sync.Mutex
@@ -266,6 +311,14 @@ func (s *Stream) waitOnHeader() error {
}
select {
case <-s.ctx.Done():
// We prefer success over failure when reading messages because we delay
// context error in stream.Read(). To keep behavior consistent, we also
// prefer success here.
select {
case <-s.headerChan:
return nil
default:
}
return ContextErr(s.ctx.Err())
case <-s.headerChan:
return nil
@@ -578,9 +631,12 @@ type ClientTransport interface {
// is called only once.
Close() error
// GracefulClose starts to tear down the transport. It stops accepting
// new RPCs and wait the completion of the pending RPCs.
GracefulClose() error
// GracefulClose starts to tear down the transport: the transport will stop
// accepting new RPCs and NewStream will return error. Once all streams are
// finished, the transport will close.
//
// It does not block.
GracefulClose()
// Write sends the data for the given stream. A nil stream indicates
// the write is to be performed on the transport as a whole.