TUN-5675: Remove github.com/dgrijalva/jwt-go dependency by upgrading coredns version

This commit is contained in:
cthuang
2022-01-25 13:15:24 +00:00
parent a84cbcde7e
commit 8a5343d0a5
530 changed files with 33042 additions and 14229 deletions

View File

@@ -20,13 +20,17 @@ package transport
import (
"bytes"
"errors"
"fmt"
"runtime"
"strconv"
"sync"
"sync/atomic"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/status"
)
var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
@@ -128,6 +132,15 @@ type cleanupStream struct {
func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
type earlyAbortStream struct {
httpStatus uint32
streamID uint32
contentSubtype string
status *status.Status
}
func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
type dataFrame struct {
streamID uint32
endStream bool
@@ -284,7 +297,7 @@ type controlBuffer struct {
// closed and nilled when transportResponseFrames drops below the
// threshold. Both fields are protected by mu.
transportResponseFrames int
trfChan atomic.Value // *chan struct{}
trfChan atomic.Value // chan struct{}
}
func newControlBuffer(done <-chan struct{}) *controlBuffer {
@@ -298,10 +311,10 @@ func newControlBuffer(done <-chan struct{}) *controlBuffer {
// throttle blocks if there are too many incomingSettings/cleanupStreams in the
// controlbuf.
func (c *controlBuffer) throttle() {
ch, _ := c.trfChan.Load().(*chan struct{})
ch, _ := c.trfChan.Load().(chan struct{})
if ch != nil {
select {
case <-*ch:
case <-ch:
case <-c.done:
}
}
@@ -335,8 +348,7 @@ func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (b
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
// We are adding the frame that puts us over the threshold; create
// a throttling channel.
ch := make(chan struct{})
c.trfChan.Store(&ch)
c.trfChan.Store(make(chan struct{}))
}
}
c.mu.Unlock()
@@ -377,9 +389,9 @@ func (c *controlBuffer) get(block bool) (interface{}, error) {
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
// We are removing the frame that put us over the
// threshold; close and clear the throttling channel.
ch := c.trfChan.Load().(*chan struct{})
close(*ch)
c.trfChan.Store((*chan struct{})(nil))
ch := c.trfChan.Load().(chan struct{})
close(ch)
c.trfChan.Store((chan struct{})(nil))
}
c.transportResponseFrames--
}
@@ -395,7 +407,6 @@ func (c *controlBuffer) get(block bool) (interface{}, error) {
select {
case <-c.ch:
case <-c.done:
c.finish()
return nil, ErrConnClosing
}
}
@@ -420,6 +431,14 @@ func (c *controlBuffer) finish() {
hdr.onOrphaned(ErrConnClosing)
}
}
// In case throttle() is currently in flight, it needs to be unblocked.
// Otherwise, the transport may not close, since the transport is closed by
// the reader encountering the connection error.
ch, _ := c.trfChan.Load().(chan struct{})
if ch != nil {
close(ch)
}
c.trfChan.Store((chan struct{})(nil))
c.mu.Unlock()
}
@@ -749,6 +768,27 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
return nil
}
func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
if l.side == clientSide {
return errors.New("earlyAbortStream not handled on client")
}
// In case the caller forgets to set the http status, default to 200.
if eas.httpStatus == 0 {
eas.httpStatus = 200
}
headerFields := []hpack.HeaderField{
{Name: ":status", Value: strconv.Itoa(int(eas.httpStatus))},
{Name: "content-type", Value: grpcutil.ContentType(eas.contentSubtype)},
{Name: "grpc-status", Value: strconv.Itoa(int(eas.status.Code()))},
{Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())},
}
if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil {
return err
}
return nil
}
func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
if l.side == clientSide {
l.draining = true
@@ -787,6 +827,8 @@ func (l *loopyWriter) handle(i interface{}) error {
return l.registerStreamHandler(i)
case *cleanupStream:
return l.cleanupStreamHandler(i)
case *earlyAbortStream:
return l.earlyAbortStreamHandler(i)
case *incomingGoAway:
return l.incomingGoAwayHandler(i)
case *dataFrame:

View File

@@ -136,12 +136,10 @@ type inFlow struct {
// newLimit updates the inflow window to a new value n.
// It assumes that n is always greater than the old limit.
func (f *inFlow) newLimit(n uint32) uint32 {
func (f *inFlow) newLimit(n uint32) {
f.mu.Lock()
d := n - f.limit
f.limit = n
f.mu.Unlock()
return d
}
func (f *inFlow) maybeAdjust(n uint32) uint32 {

View File

@@ -141,9 +141,8 @@ type serverHandlerTransport struct {
stats stats.Handler
}
func (ht *serverHandlerTransport) Close() error {
func (ht *serverHandlerTransport) Close() {
ht.closeOnce.Do(ht.closeCloseChanOnce)
return nil
}
func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) }

View File

@@ -24,6 +24,8 @@ import (
"io"
"math"
"net"
"net/http"
"path/filepath"
"strconv"
"strings"
"sync"
@@ -32,13 +34,14 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
icredentials "google.golang.org/grpc/internal/credentials"
"google.golang.org/grpc/internal/grpcutil"
imetadata "google.golang.org/grpc/internal/metadata"
"google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
@@ -59,7 +62,7 @@ type http2Client struct {
cancel context.CancelFunc
ctxDone <-chan struct{} // Cache the ctx.Done() chan.
userAgent string
md interface{}
md metadata.MD
conn net.Conn // underlying communication channel
loopy *loopyWriter
remoteAddr net.Addr
@@ -114,6 +117,9 @@ type http2Client struct {
// goAwayReason records the http2.ErrCode and debug data received with the
// GoAway frame.
goAwayReason GoAwayReason
// goAwayDebugMessage contains a detailed human readable string about a
// GoAway frame, useful for error messages.
goAwayDebugMessage string
// A condition variable used to signal when the keepalive goroutine should
// go dormant. The condition for dormancy is based on the number of active
// streams and the `PermitWithoutStream` keepalive client parameter. And
@@ -137,11 +143,34 @@ type http2Client struct {
connectionID uint64
}
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) {
address := addr.Addr
networkType, ok := networktype.Get(addr)
if fn != nil {
return fn(ctx, addr)
// Special handling for unix scheme with custom dialer. Back in the day,
// we did not have a unix resolver and therefore targets with a unix
// scheme would end up using the passthrough resolver. So, user's used a
// custom dialer in this case and expected the original dial target to
// be passed to the custom dialer. Now, we have a unix resolver. But if
// a custom dialer is specified, we want to retain the old behavior in
// terms of the address being passed to the custom dialer.
if networkType == "unix" && !strings.HasPrefix(address, "\x00") {
// Supported unix targets are either "unix://absolute-path" or
// "unix:relative-path".
if filepath.IsAbs(address) {
return fn(ctx, "unix://"+address)
}
return fn(ctx, "unix:"+address)
}
return fn(ctx, address)
}
return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
if !ok {
networkType, address = parseDialTarget(address)
}
if networkType == "tcp" && useProxy {
return proxyDial(ctx, address, grpcUA)
}
return (&net.Dialer{}).DialContext(ctx, networkType, address)
}
func isTemporary(err error) bool {
@@ -172,7 +201,13 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
}()
conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
// gRPC, resolver, balancer etc. can specify arbitrary data in the
// Attributes field of resolver.Address, which is shoved into connectCtx
// and passed to the dialer and credential handshaker. This makes it possible for
// address specific arbitrary data to reach custom dialers and credential handshakers.
connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)
if err != nil {
if opts.FailOnNonTempDialError {
return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
@@ -216,16 +251,30 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
}
if transportCreds != nil {
// gRPC, resolver, balancer etc. can specify arbitrary data in the
// Attributes field of resolver.Address, which is shoved into connectCtx
// and passed to the credential handshaker. This makes it possible for
// address specific arbitrary data to reach the credential handshaker.
contextWithHandshakeInfo := internal.NewClientHandshakeInfoContext.(func(context.Context, credentials.ClientHandshakeInfo) context.Context)
connectCtx = contextWithHandshakeInfo(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, conn)
rawConn := conn
// Pull the deadline from the connectCtx, which will be used for
// timeouts in the authentication protocol handshake. Can ignore the
// boolean as the deadline will return the zero value, which will make
// the conn not timeout on I/O operations.
deadline, _ := connectCtx.Deadline()
rawConn.SetDeadline(deadline)
conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, rawConn)
rawConn.SetDeadline(time.Time{})
if err != nil {
return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
}
for _, cd := range perRPCCreds {
if cd.RequireTransportSecurity() {
if ci, ok := authInfo.(interface {
GetCommonAuthInfo() credentials.CommonAuthInfo
}); ok {
secLevel := ci.GetCommonAuthInfo().SecurityLevel
if secLevel != credentials.InvalidSecurityLevel && secLevel < credentials.PrivacyAndIntegrity {
return nil, connectionErrorf(true, nil, "transport: cannot send secure credentials on an insecure connection")
}
}
}
}
isSecure = true
if transportCreds.Info().SecurityProtocol == "tls" {
scheme = "https"
@@ -248,7 +297,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
ctxDone: ctx.Done(), // Cache Done chan.
cancel: cancel,
userAgent: opts.UserAgent,
md: addr.Metadata,
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
@@ -276,6 +324,12 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
keepaliveEnabled: keepaliveEnabled,
bufferPool: newBufferPool(),
}
if md, ok := addr.Metadata.(*metadata.MD); ok {
t.md = *md
} else if md := imetadata.Get(addr); md != nil {
t.md = md
}
t.controlBuf = newControlBuffer(t.ctxDone)
if opts.InitialWindowSize >= defaultWindowSize {
t.initialWindowSize = opts.InitialWindowSize
@@ -312,12 +366,14 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
// Send connection preface to server.
n, err := t.conn.Write(clientPreface)
if err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
t.Close(err)
return nil, err
}
if n != len(clientPreface) {
t.Close()
return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
t.Close(err)
return nil, err
}
var ss []http2.Setting
@@ -335,14 +391,16 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
err = t.framer.fr.WriteSettings(ss...)
if err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
t.Close(err)
return nil, err
}
// Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
t.Close(err)
return nil, err
}
}
@@ -359,11 +417,10 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
}
}
// If it's a connection error, let reader goroutine handle it
// since there might be data in the buffers.
if _, ok := err.(net.Error); !ok {
t.conn.Close()
}
// Do not close the transport. Let reader goroutine handle it since
// there might be data in the buffers.
t.conn.Close()
t.controlBuf.finish()
close(t.writerDone)
}()
return t, nil
@@ -379,6 +436,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
buf: newRecvBuffer(),
headerChan: make(chan struct{}),
contentSubtype: callHdr.ContentSubtype,
doneFunc: callHdr.DoneFunc,
}
s.wq = newWriteQuota(defaultWriteQuota, s.done)
s.requestRead = func(n int) {
@@ -418,7 +476,7 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
Method: callHdr.Method,
AuthInfo: t.authInfo,
}
ctxWithRequestInfo := internal.NewRequestInfoContext.(func(context.Context, credentials.RequestInfo) context.Context)(ctx, ri)
ctxWithRequestInfo := icredentials.NewRequestInfoContext(ctx, ri)
authData, err := t.getTrAuthData(ctxWithRequestInfo, aud)
if err != nil {
return nil, err
@@ -481,25 +539,23 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
for _, vv := range added {
for i, v := range vv {
if i%2 == 0 {
k = v
k = strings.ToLower(v)
continue
}
// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
if isReservedHeader(k) {
continue
}
headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
}
}
if md, ok := t.md.(*metadata.MD); ok {
for k, vv := range *md {
if isReservedHeader(k) {
continue
}
for _, v := range vv {
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
for k, vv := range t.md {
if isReservedHeader(k) {
continue
}
for _, v := range vv {
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
}
return headerFields, nil
@@ -532,7 +588,7 @@ func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[s
return nil, err
}
return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err)
return nil, status.Errorf(codes.Unauthenticated, "transport: per-RPC creds failed due to error: %v", err)
}
for k, v := range data {
// Capital header names are illegal in HTTP/2.
@@ -549,8 +605,11 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
// Note: if these credentials are provided both via dial options and call
// options, then both sets of credentials will be applied.
if callCreds := callHdr.Creds; callCreds != nil {
if !t.isSecure && callCreds.RequireTransportSecurity() {
return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
if callCreds.RequireTransportSecurity() {
ri, _ := credentials.RequestInfoFromContext(ctx)
if !t.isSecure || credentials.CheckSecurityLevel(ri.AuthInfo, credentials.PrivacyAndIntegrity) != nil {
return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
}
}
data, err := callCreds.GetRequestMetadata(ctx, audience)
if err != nil {
@@ -566,26 +625,35 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
return callAuthData, nil
}
// PerformedIOError wraps an error to indicate IO may have been performed
// before the error occurred.
type PerformedIOError struct {
// NewStreamError wraps an error and reports additional information. Typically
// NewStream errors result in transparent retry, as they mean nothing went onto
// the wire. However, there are two notable exceptions:
//
// 1. If the stream headers violate the max header list size allowed by the
// server. In this case there is no reason to retry at all, as it is
// assumed the RPC would continue to fail on subsequent attempts.
// 2. If the credentials errored when requesting their headers. In this case,
// it's possible a retry can fix the problem, but indefinitely transparently
// retrying is not appropriate as it is likely the credentials, if they can
// eventually succeed, would need I/O to do so.
type NewStreamError struct {
Err error
DoNotRetry bool
DoNotTransparentRetry bool
}
// Error implements error.
func (p PerformedIOError) Error() string {
return p.Err.Error()
func (e NewStreamError) Error() string {
return e.Err.Error()
}
// NewStream creates a stream and registers it into the transport as "active"
// streams.
// streams. All non-nil errors returned will be *NewStreamError.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
ctx = peer.NewContext(ctx, t.getPeer())
headerFields, err := t.createHeaderFields(ctx, callHdr)
if err != nil {
// We may have performed I/O in the per-RPC creds callback, so do not
// allow transparent retry.
return nil, PerformedIOError{err}
return nil, &NewStreamError{Err: err, DoNotTransparentRetry: true}
}
s := t.newStream(ctx, callHdr)
cleanup := func(err error) {
@@ -685,23 +753,23 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return true
}, hdr)
if err != nil {
return nil, err
return nil, &NewStreamError{Err: err}
}
if success {
break
}
if hdrListSizeErr != nil {
return nil, hdrListSizeErr
return nil, &NewStreamError{Err: hdrListSizeErr, DoNotRetry: true}
}
firstTry = false
select {
case <-ch:
case <-s.ctx.Done():
return nil, ContextErr(s.ctx.Err())
case <-ctx.Done():
return nil, &NewStreamError{Err: ContextErr(ctx.Err())}
case <-t.goAway:
return nil, errStreamDrain
return nil, &NewStreamError{Err: errStreamDrain}
case <-t.ctx.Done():
return nil, ErrConnClosing
return nil, &NewStreamError{Err: ErrConnClosing}
}
}
if t.statsHandler != nil {
@@ -796,6 +864,9 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
// This will unblock write.
close(s.done)
if s.doneFunc != nil {
s.doneFunc()
}
}
// Close kicks off the shutdown process of the transport. This should be called
@@ -805,12 +876,12 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
// This method blocks until the addrConn that initiated this transport is
// re-connected. This happens because t.onClose() begins reconnect logic at the
// addrConn level and blocks until the addrConn is successfully connected.
func (t *http2Client) Close() error {
func (t *http2Client) Close(err error) {
t.mu.Lock()
// Make sure we only Close once.
if t.state == closing {
t.mu.Unlock()
return nil
return
}
// Call t.onClose before setting the state to closing to prevent the client
// from attempting to create new streams ASAP.
@@ -826,13 +897,25 @@ func (t *http2Client) Close() error {
t.mu.Unlock()
t.controlBuf.finish()
t.cancel()
err := t.conn.Close()
t.conn.Close()
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
}
// Append info about previous goaways if there were any, since this may be important
// for understanding the root cause for this connection to be closed.
_, goAwayDebugMessage := t.GetGoAwayReason()
var st *status.Status
if len(goAwayDebugMessage) > 0 {
st = status.Newf(codes.Unavailable, "closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage)
err = st.Err()
} else {
st = status.New(codes.Unavailable, err.Error())
}
// Notify all active streams.
for _, s := range streams {
t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
t.closeStream(s, err, false, http2.ErrCodeNo, st, nil, false)
}
if t.statsHandler != nil {
connEnd := &stats.ConnEnd{
@@ -840,7 +923,6 @@ func (t *http2Client) Close() error {
}
t.statsHandler.HandleConn(t.ctx, connEnd)
}
return err
}
// GracefulClose sets the state to draining, which prevents new streams from
@@ -859,7 +941,7 @@ func (t *http2Client) GracefulClose() {
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close()
t.Close(ErrConnClosing)
return
}
t.controlBuf.put(&incomingGoAway{})
@@ -1000,7 +1082,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
}
// The server has closed the stream without sending trailers. Record that
// the read direction is closed, and set the status appropriately.
if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
if f.StreamEnded() {
t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
}
}
@@ -1105,9 +1187,9 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
}
}
id := f.LastStreamID
if id > 0 && id%2 != 1 {
if id > 0 && id%2 == 0 {
t.mu.Unlock()
t.Close()
t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", id))
return
}
// A client can receive multiple GoAways from the server (see
@@ -1125,7 +1207,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
// If there are multiple GoAways the first one should always have an ID greater than the following ones.
if id > t.prevGoAwayID {
t.mu.Unlock()
t.Close()
t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID))
return
}
default:
@@ -1155,7 +1237,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close()
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
}
}
@@ -1171,12 +1253,17 @@ func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
t.goAwayReason = GoAwayTooManyPings
}
}
if len(f.DebugData()) == 0 {
t.goAwayDebugMessage = fmt.Sprintf("code: %s", f.ErrCode)
} else {
t.goAwayDebugMessage = fmt.Sprintf("code: %s, debug data: %q", f.ErrCode, string(f.DebugData()))
}
}
func (t *http2Client) GetGoAwayReason() GoAwayReason {
func (t *http2Client) GetGoAwayReason() (GoAwayReason, string) {
t.mu.Lock()
defer t.mu.Unlock()
return t.goAwayReason
return t.goAwayReason, t.goAwayDebugMessage
}
func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
@@ -1203,35 +1290,128 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
return
}
state := &decodeState{}
// Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
state.data.isGRPC = !initialHeader
if err := state.decodeHeader(frame); err != nil {
t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
// frame.Truncated is set to true when framer detects that the current header
// list size hits MaxHeaderListSize limit.
if frame.Truncated {
se := status.New(codes.Internal, "peer header list size exceeded limit")
t.closeStream(s, se.Err(), true, http2.ErrCodeFrameSize, se, nil, endStream)
return
}
var (
// If a gRPC Response-Headers has already been received, then it means
// that the peer is speaking gRPC and we are in gRPC mode.
isGRPC = !initialHeader
mdata = make(map[string][]string)
contentTypeErr = "malformed header: missing HTTP content-type"
grpcMessage string
statusGen *status.Status
recvCompress string
httpStatusCode *int
httpStatusErr string
rawStatusCode = codes.Unknown
// headerError is set if an error is encountered while parsing the headers
headerError string
)
if initialHeader {
httpStatusErr = "malformed header: missing HTTP status"
}
for _, hf := range frame.Fields {
switch hf.Name {
case "content-type":
if _, validContentType := grpcutil.ContentSubtype(hf.Value); !validContentType {
contentTypeErr = fmt.Sprintf("transport: received unexpected content-type %q", hf.Value)
break
}
contentTypeErr = ""
mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
isGRPC = true
case "grpc-encoding":
recvCompress = hf.Value
case "grpc-status":
code, err := strconv.ParseInt(hf.Value, 10, 32)
if err != nil {
se := status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-status: %v", err))
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
return
}
rawStatusCode = codes.Code(uint32(code))
case "grpc-message":
grpcMessage = decodeGrpcMessage(hf.Value)
case "grpc-status-details-bin":
var err error
statusGen, err = decodeGRPCStatusDetails(hf.Value)
if err != nil {
headerError = fmt.Sprintf("transport: malformed grpc-status-details-bin: %v", err)
}
case ":status":
if hf.Value == "200" {
httpStatusErr = ""
statusCode := 200
httpStatusCode = &statusCode
break
}
c, err := strconv.ParseInt(hf.Value, 10, 32)
if err != nil {
se := status.New(codes.Internal, fmt.Sprintf("transport: malformed http-status: %v", err))
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
return
}
statusCode := int(c)
httpStatusCode = &statusCode
httpStatusErr = fmt.Sprintf(
"unexpected HTTP status code received from server: %d (%s)",
statusCode,
http.StatusText(statusCode),
)
default:
if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
break
}
v, err := decodeMetadataHeader(hf.Name, hf.Value)
if err != nil {
headerError = fmt.Sprintf("transport: malformed %s: %v", hf.Name, err)
logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
break
}
mdata[hf.Name] = append(mdata[hf.Name], v)
}
}
if !isGRPC || httpStatusErr != "" {
var code = codes.Internal // when header does not include HTTP status, return INTERNAL
if httpStatusCode != nil {
var ok bool
code, ok = HTTPStatusConvTab[*httpStatusCode]
if !ok {
code = codes.Unknown
}
}
var errs []string
if httpStatusErr != "" {
errs = append(errs, httpStatusErr)
}
if contentTypeErr != "" {
errs = append(errs, contentTypeErr)
}
// Verify the HTTP response is a 200.
se := status.New(code, strings.Join(errs, "; "))
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
return
}
if headerError != "" {
se := status.New(codes.Internal, headerError)
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
return
}
isHeader := false
defer func() {
if t.statsHandler != nil {
if isHeader {
inHeader := &stats.InHeader{
Client: true,
WireLength: int(frame.Header().Length),
Header: s.header.Copy(),
Compression: s.recvCompress,
}
t.statsHandler.HandleRPC(s.ctx, inHeader)
} else {
inTrailer := &stats.InTrailer{
Client: true,
WireLength: int(frame.Header().Length),
Trailer: s.trailer.Copy(),
}
t.statsHandler.HandleRPC(s.ctx, inTrailer)
}
}
}()
// If headerChan hasn't been closed yet
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
@@ -1242,9 +1422,9 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
// These values can be set without any synchronization because
// stream goroutine will read it only after seeing a closed
// headerChan which we'll close after setting this.
s.recvCompress = state.data.encoding
if len(state.data.mdata) > 0 {
s.header = state.data.mdata
s.recvCompress = recvCompress
if len(mdata) > 0 {
s.header = mdata
}
} else {
// HEADERS frame block carries a Trailers-Only.
@@ -1253,13 +1433,36 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
close(s.headerChan)
}
if t.statsHandler != nil {
if isHeader {
inHeader := &stats.InHeader{
Client: true,
WireLength: int(frame.Header().Length),
Header: metadata.MD(mdata).Copy(),
Compression: s.recvCompress,
}
t.statsHandler.HandleRPC(s.ctx, inHeader)
} else {
inTrailer := &stats.InTrailer{
Client: true,
WireLength: int(frame.Header().Length),
Trailer: metadata.MD(mdata).Copy(),
}
t.statsHandler.HandleRPC(s.ctx, inTrailer)
}
}
if !endStream {
return
}
if statusGen == nil {
statusGen = status.New(rawStatusCode, grpcMessage)
}
// if client received END_STREAM from server while stream was still active, send RST_STREAM
rst := s.getState() == streamActive
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true)
}
// reader runs as a separate goroutine in charge of reading data from network
@@ -1273,7 +1476,8 @@ func (t *http2Client) reader() {
// Check the validity of server preface.
frame, err := t.framer.fr.ReadFrame()
if err != nil {
t.Close() // this kicks off resetTransport, so must be last before return
err = connectionErrorf(true, err, "error reading server preface: %v", err)
t.Close(err) // this kicks off resetTransport, so must be last before return
return
}
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
@@ -1282,7 +1486,8 @@ func (t *http2Client) reader() {
}
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
t.Close() // this kicks off resetTransport, so must be last before return
// this kicks off resetTransport, so must be last before return
t.Close(connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame: %T", frame))
return
}
t.onPrefaceReceipt()
@@ -1306,13 +1511,19 @@ func (t *http2Client) reader() {
if s != nil {
// use error detail to provide better err message
code := http2ErrConvTab[se.Code]
msg := t.framer.fr.ErrorDetail().Error()
errorDetail := t.framer.fr.ErrorDetail()
var msg string
if errorDetail != nil {
msg = errorDetail.Error()
} else {
msg = "received invalid frame"
}
t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
}
continue
} else {
// Transport error.
t.Close()
t.Close(connectionErrorf(true, err, "error reading from server: %v", err))
return
}
}
@@ -1346,7 +1557,7 @@ func minTime(a, b time.Duration) time.Duration {
return b
}
// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
// keepalive running in a separate goroutine makes sure the connection is alive by sending pings.
func (t *http2Client) keepalive() {
p := &ping{data: [8]byte{}}
// True iff a ping has been sent, and no data has been received since then.
@@ -1371,7 +1582,7 @@ func (t *http2Client) keepalive() {
continue
}
if outstandingPing && timeoutLeft <= 0 {
t.Close()
t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout"))
return
}
t.mu.Lock()

View File

@@ -26,6 +26,7 @@ import (
"io"
"math"
"net"
"net/http"
"strconv"
"sync"
"sync/atomic"
@@ -72,7 +73,6 @@ type http2Server struct {
writerDone chan struct{} // sync point to enable testing.
remoteAddr net.Addr
localAddr net.Addr
maxStreamID uint32 // max stream ID ever seen
authInfo credentials.AuthInfo // auth info about the connection
inTapHandle tap.ServerInHandle
framer *framer
@@ -101,11 +101,11 @@ type http2Server struct {
mu sync.Mutex // guard the following
// drainChan is initialized when drain(...) is called the first time.
// drainChan is initialized when Drain() is called the first time.
// After which the server writes out the first GoAway(with ID 2^31-1) frame.
// Then an independent goroutine will be launched to later send the second GoAway.
// During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
// Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
// Thus call to Drain() will be a no-op if drainChan is already initialized since draining is
// already underway.
drainChan chan struct{}
state transportState
@@ -122,11 +122,37 @@ type http2Server struct {
bufferPool *bufferPool
connectionID uint64
// maxStreamMu guards the maximum stream ID
// This lock may not be taken if mu is already held.
maxStreamMu sync.Mutex
maxStreamID uint32 // max stream ID ever seen
}
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
// returned if something goes wrong.
func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
// NewServerTransport creates a http2 transport with conn and configuration
// options from config.
//
// It returns a non-nil transport and a nil error on success. On failure, it
// returns a nil transport and a non-nil error. For a special case where the
// underlying conn gets closed before the client preface could be read, it
// returns a nil transport and a nil error.
func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
var authInfo credentials.AuthInfo
rawConn := conn
if config.Credentials != nil {
var err error
conn, authInfo, err = config.Credentials.ServerHandshake(rawConn)
if err != nil {
// ErrConnDispatched means that the connection was dispatched away
// from gRPC; those connections should be left open. io.EOF means
// the connection was closed before handshaking completed, which can
// happen naturally from probers. Return these errors directly.
if err == credentials.ErrConnDispatched || err == io.EOF {
return nil, err
}
return nil, connectionErrorf(false, err, "ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
}
}
writeBufSize := config.WriteBufferSize
readBufSize := config.ReadBufferSize
maxHeaderListSize := defaultServerMaxHeaderListSize
@@ -209,14 +235,15 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
if kep.MinTime == 0 {
kep.MinTime = defaultKeepalivePolicyMinTime
}
done := make(chan struct{})
t := &http2Server{
ctx: context.Background(),
ctx: setConnection(context.Background(), rawConn),
done: done,
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
authInfo: config.AuthInfo,
authInfo: authInfo,
framer: framer,
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
@@ -265,6 +292,14 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
// Check the validity of client preface.
preface := make([]byte, len(clientPreface))
if _, err := io.ReadFull(t.conn, preface); err != nil {
// In deployments where a gRPC server runs behind a cloud load balancer
// which performs regular TCP level health checks, the connection is
// closed immediately by the latter. Returning io.EOF here allows the
// grpc server implementation to recognize this scenario and suppress
// logging to reduce spam.
if err == io.EOF {
return nil, io.EOF
}
return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
}
if !bytes.Equal(preface, clientPreface) {
@@ -294,6 +329,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
}
}
t.conn.Close()
t.controlBuf.finish()
close(t.writerDone)
}()
go t.keepalive()
@@ -302,38 +338,144 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
// operateHeader takes action on the decoded headers.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
// Acquire max stream ID lock for entire duration
t.maxStreamMu.Lock()
defer t.maxStreamMu.Unlock()
streamID := frame.Header().StreamID
state := &decodeState{
serverSide: true,
}
if err := state.decodeHeader(frame); err != nil {
if se, ok := status.FromError(err); ok {
t.controlBuf.put(&cleanupStream{
streamID: streamID,
rst: true,
rstCode: statusCodeConvTab[se.Code()],
onWrite: func() {},
})
}
// frame.Truncated is set to true when framer detects that the current header
// list size hits MaxHeaderListSize limit.
if frame.Truncated {
t.controlBuf.put(&cleanupStream{
streamID: streamID,
rst: true,
rstCode: http2.ErrCodeFrameSize,
onWrite: func() {},
})
return false
}
if streamID%2 != 1 || streamID <= t.maxStreamID {
// illegal gRPC stream id.
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
}
return true
}
t.maxStreamID = streamID
buf := newRecvBuffer()
s := &Stream{
id: streamID,
st: t,
buf: buf,
fc: &inFlow{limit: uint32(t.initialWindowSize)},
recvCompress: state.data.encoding,
method: state.data.method,
contentSubtype: state.data.contentSubtype,
id: streamID,
st: t,
buf: buf,
fc: &inFlow{limit: uint32(t.initialWindowSize)},
}
var (
// If a gRPC Response-Headers has already been received, then it means
// that the peer is speaking gRPC and we are in gRPC mode.
isGRPC = false
mdata = make(map[string][]string)
httpMethod string
// headerError is set if an error is encountered while parsing the headers
headerError bool
timeoutSet bool
timeout time.Duration
)
for _, hf := range frame.Fields {
switch hf.Name {
case "content-type":
contentSubtype, validContentType := grpcutil.ContentSubtype(hf.Value)
if !validContentType {
break
}
mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
s.contentSubtype = contentSubtype
isGRPC = true
case "grpc-encoding":
s.recvCompress = hf.Value
case ":method":
httpMethod = hf.Value
case ":path":
s.method = hf.Value
case "grpc-timeout":
timeoutSet = true
var err error
if timeout, err = decodeTimeout(hf.Value); err != nil {
headerError = true
}
// "Transports must consider requests containing the Connection header
// as malformed." - A41
case "connection":
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.operateHeaders parsed a :connection header which makes a request malformed as per the HTTP/2 spec")
}
headerError = true
default:
if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
break
}
v, err := decodeMetadataHeader(hf.Name, hf.Value)
if err != nil {
headerError = true
logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
break
}
mdata[hf.Name] = append(mdata[hf.Name], v)
}
}
// "If multiple Host headers or multiple :authority headers are present, the
// request must be rejected with an HTTP status code 400 as required by Host
// validation in RFC 7230 §5.4, gRPC status code INTERNAL, or RST_STREAM
// with HTTP/2 error code PROTOCOL_ERROR." - A41. Since this is a HTTP/2
// error, this takes precedence over a client not speaking gRPC.
if len(mdata[":authority"]) > 1 || len(mdata["host"]) > 1 {
errMsg := fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len(mdata[":authority"]), len(mdata["host"]))
if logger.V(logLevel) {
logger.Errorf("transport: %v", errMsg)
}
t.controlBuf.put(&earlyAbortStream{
httpStatus: 400,
streamID: streamID,
contentSubtype: s.contentSubtype,
status: status.New(codes.Internal, errMsg),
})
return false
}
if !isGRPC || headerError {
t.controlBuf.put(&cleanupStream{
streamID: streamID,
rst: true,
rstCode: http2.ErrCodeProtocol,
onWrite: func() {},
})
return false
}
// "If :authority is missing, Host must be renamed to :authority." - A41
if len(mdata[":authority"]) == 0 {
// No-op if host isn't present, no eventual :authority header is a valid
// RPC.
if host, ok := mdata["host"]; ok {
mdata[":authority"] = host
delete(mdata, "host")
}
} else {
// "If :authority is present, Host must be discarded" - A41
delete(mdata, "host")
}
if frame.StreamEnded() {
// s is just created by the caller. No lock needed.
s.state = streamReadDone
}
if state.data.timeoutSet {
s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
if timeoutSet {
s.ctx, s.cancel = context.WithTimeout(t.ctx, timeout)
} else {
s.ctx, s.cancel = context.WithCancel(t.ctx)
}
@@ -346,33 +488,13 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
s.ctx = peer.NewContext(s.ctx, pr)
// Attach the received metadata to the context.
if len(state.data.mdata) > 0 {
s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
}
if state.data.statsTags != nil {
s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
}
if state.data.statsTrace != nil {
s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
}
if t.inTapHandle != nil {
var err error
info := &tap.Info{
FullMethodName: state.data.method,
if len(mdata) > 0 {
s.ctx = metadata.NewIncomingContext(s.ctx, mdata)
if statsTags := mdata["grpc-tags-bin"]; len(statsTags) > 0 {
s.ctx = stats.SetIncomingTags(s.ctx, []byte(statsTags[len(statsTags)-1]))
}
s.ctx, err = t.inTapHandle(s.ctx, info)
if err != nil {
if logger.V(logLevel) {
logger.Warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
}
t.controlBuf.put(&cleanupStream{
streamID: s.id,
rst: true,
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
s.cancel()
return false
if statsTrace := mdata["grpc-trace-bin"]; len(statsTrace) > 0 {
s.ctx = stats.SetIncomingTrace(s.ctx, []byte(statsTrace[len(statsTrace)-1]))
}
}
t.mu.Lock()
@@ -392,16 +514,40 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.cancel()
return false
}
if streamID%2 != 1 || streamID <= t.maxStreamID {
if httpMethod != http.MethodPost {
t.mu.Unlock()
// illegal gRPC stream id.
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
logger.Infof("transport: http2Server.operateHeaders parsed a :method field: %v which should be POST", httpMethod)
}
t.controlBuf.put(&cleanupStream{
streamID: streamID,
rst: true,
rstCode: http2.ErrCodeProtocol,
onWrite: func() {},
})
s.cancel()
return true
return false
}
if t.inTapHandle != nil {
var err error
if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: s.method}); err != nil {
t.mu.Unlock()
if logger.V(logLevel) {
logger.Infof("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
}
stat, ok := status.FromError(err)
if !ok {
stat = status.New(codes.PermissionDenied, err.Error())
}
t.controlBuf.put(&earlyAbortStream{
httpStatus: 200,
streamID: s.id,
contentSubtype: s.contentSubtype,
status: stat,
})
return false
}
}
t.maxStreamID = streamID
t.activeStreams[streamID] = s
if len(t.activeStreams) == 1 {
t.idle = time.Time{}
@@ -423,7 +569,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
LocalAddr: t.localAddr,
Compression: s.recvCompress,
WireLength: int(frame.Header().Length),
Header: metadata.MD(state.data.mdata).Copy(),
Header: metadata.MD(mdata).Copy(),
}
t.stats.HandleRPC(s.ctx, inHeader)
}
@@ -635,7 +781,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
s.write(recvMsg{buffer: buffer})
}
}
if f.Header().Flags.Has(http2.FlagDataEndStream) {
if f.StreamEnded() {
// Received the end of stream from the client.
s.compareAndSwapState(streamActive, streamReadDone)
s.write(recvMsg{err: io.EOF})
@@ -990,12 +1136,12 @@ func (t *http2Server) keepalive() {
if val <= 0 {
// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
// Gracefully close the connection.
t.drain(http2.ErrCodeNo, []byte{})
t.Drain()
return
}
idleTimer.Reset(val)
case <-ageTimer.C:
t.drain(http2.ErrCodeNo, []byte{})
t.Drain()
ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
select {
case <-ageTimer.C:
@@ -1049,11 +1195,11 @@ func (t *http2Server) keepalive() {
// Close starts shutting down the http2Server transport.
// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
// could cause some resource issue. Revisit this later.
func (t *http2Server) Close() error {
func (t *http2Server) Close() {
t.mu.Lock()
if t.state == closing {
t.mu.Unlock()
return errors.New("transport: Close() was already called")
return
}
t.state = closing
streams := t.activeStreams
@@ -1061,7 +1207,9 @@ func (t *http2Server) Close() error {
t.mu.Unlock()
t.controlBuf.finish()
close(t.done)
err := t.conn.Close()
if err := t.conn.Close(); err != nil && logger.V(logLevel) {
logger.Infof("transport: error closing conn during Close: %v", err)
}
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
}
@@ -1073,7 +1221,6 @@ func (t *http2Server) Close() error {
connEnd := &stats.ConnEnd{}
t.stats.HandleConn(t.ctx, connEnd)
}
return err
}
// deleteStream deletes the stream s from transport's active streams.
@@ -1138,17 +1285,13 @@ func (t *http2Server) RemoteAddr() net.Addr {
}
func (t *http2Server) Drain() {
t.drain(http2.ErrCodeNo, []byte{})
}
func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
t.mu.Lock()
defer t.mu.Unlock()
if t.drainChan != nil {
return
}
t.drainChan = make(chan struct{})
t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte{}, headsUp: true})
}
var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
@@ -1156,20 +1299,23 @@ var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
// Handles outgoing GoAway and returns true if loopy needs to put itself
// in draining mode.
func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
t.maxStreamMu.Lock()
t.mu.Lock()
if t.state == closing { // TODO(mmukhi): This seems unnecessary.
t.mu.Unlock()
t.maxStreamMu.Unlock()
// The transport is closing.
return false, ErrConnClosing
}
sid := t.maxStreamID
if !g.headsUp {
// Stop accepting more streams now.
t.state = draining
sid := t.maxStreamID
if len(t.activeStreams) == 0 {
g.closeConn = true
}
t.mu.Unlock()
t.maxStreamMu.Unlock()
if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
return false, err
}
@@ -1182,6 +1328,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
return true, nil
}
t.mu.Unlock()
t.maxStreamMu.Unlock()
// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
// Follow that with a ping and wait for the ack to come back or a timer
// to expire. During this time accept new streams since they might have
@@ -1266,3 +1413,18 @@ func getJitter(v time.Duration) time.Duration {
j := grpcrand.Int63n(2*r) - r
return time.Duration(j)
}
type connectionKey struct{}
// GetConnection gets the connection from the context.
func GetConnection(ctx context.Context) net.Conn {
conn, _ := ctx.Value(connectionKey{}).(net.Conn)
return conn
}
// SetConnection adds the connection to the context to be able to get
// information about the destination ip and port for an incoming RPC. This also
// allows any unary or streaming interceptors to see the connection.
func setConnection(ctx context.Context, conn net.Conn) context.Context {
return context.WithValue(ctx, connectionKey{}, conn)
}

View File

@@ -27,6 +27,7 @@ import (
"math"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"
@@ -38,7 +39,6 @@ import (
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/status"
)
@@ -73,13 +73,6 @@ var (
http2.ErrCodeInadequateSecurity: codes.PermissionDenied,
http2.ErrCodeHTTP11Required: codes.Internal,
}
statusCodeConvTab = map[codes.Code]http2.ErrCode{
codes.Internal: http2.ErrCodeInternal,
codes.Canceled: http2.ErrCodeCancel,
codes.Unavailable: http2.ErrCodeRefusedStream,
codes.ResourceExhausted: http2.ErrCodeEnhanceYourCalm,
codes.PermissionDenied: http2.ErrCodeInadequateSecurity,
}
// HTTPStatusConvTab is the HTTP status code to gRPC error code conversion table.
HTTPStatusConvTab = map[int]codes.Code{
// 400 Bad Request - INTERNAL.
@@ -102,52 +95,6 @@ var (
logger = grpclog.Component("transport")
)
type parsedHeaderData struct {
encoding string
// statusGen caches the stream status received from the trailer the server
// sent. Client side only. Do not access directly. After all trailers are
// parsed, use the status method to retrieve the status.
statusGen *status.Status
// rawStatusCode and rawStatusMsg are set from the raw trailer fields and are not
// intended for direct access outside of parsing.
rawStatusCode *int
rawStatusMsg string
httpStatus *int
// Server side only fields.
timeoutSet bool
timeout time.Duration
method string
// key-value metadata map from the peer.
mdata map[string][]string
statsTags []byte
statsTrace []byte
contentSubtype string
// isGRPC field indicates whether the peer is speaking gRPC (otherwise HTTP).
//
// We are in gRPC mode (peer speaking gRPC) if:
// * We are client side and have already received a HEADER frame that indicates gRPC peer.
// * The header contains valid a content-type, i.e. a string starts with "application/grpc"
// And we should handle error specific to gRPC.
//
// Otherwise (i.e. a content-type string starts without "application/grpc", or does not exist), we
// are in HTTP fallback mode, and should handle error specific to HTTP.
isGRPC bool
grpcErr error
httpErr error
contentTypeErr string
}
// decodeState configures decoding criteria and records the decoded data.
type decodeState struct {
// whether decoding on server side or not
serverSide bool
// Records the states during HPACK decoding. It will be filled with info parsed from HTTP HEADERS
// frame once decodeHeader function has been invoked and returned.
data parsedHeaderData
}
// isReservedHeader checks whether hdr belongs to HTTP2 headers
// reserved by gRPC protocol. Any other headers are classified as the
// user-specified metadata.
@@ -185,14 +132,6 @@ func isWhitelistedHeader(hdr string) bool {
}
}
func (d *decodeState) status() *status.Status {
if d.data.statusGen == nil {
// No status-details were provided; generate status using code/msg.
d.data.statusGen = status.New(codes.Code(int32(*(d.data.rawStatusCode))), d.data.rawStatusMsg)
}
return d.data.statusGen
}
const binHdrSuffix = "-bin"
func encodeBinHeader(v []byte) string {
@@ -222,166 +161,16 @@ func decodeMetadataHeader(k, v string) (string, error) {
return v, nil
}
func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error {
// frame.Truncated is set to true when framer detects that the current header
// list size hits MaxHeaderListSize limit.
if frame.Truncated {
return status.Error(codes.Internal, "peer header list size exceeded limit")
func decodeGRPCStatusDetails(rawDetails string) (*status.Status, error) {
v, err := decodeBinHeader(rawDetails)
if err != nil {
return nil, err
}
for _, hf := range frame.Fields {
d.processHeaderField(hf)
}
if d.data.isGRPC {
if d.data.grpcErr != nil {
return d.data.grpcErr
}
if d.serverSide {
return nil
}
if d.data.rawStatusCode == nil && d.data.statusGen == nil {
// gRPC status doesn't exist.
// Set rawStatusCode to be unknown and return nil error.
// So that, if the stream has ended this Unknown status
// will be propagated to the user.
// Otherwise, it will be ignored. In which case, status from
// a later trailer, that has StreamEnded flag set, is propagated.
code := int(codes.Unknown)
d.data.rawStatusCode = &code
}
return nil
}
// HTTP fallback mode
if d.data.httpErr != nil {
return d.data.httpErr
}
var (
code = codes.Internal // when header does not include HTTP status, return INTERNAL
ok bool
)
if d.data.httpStatus != nil {
code, ok = HTTPStatusConvTab[*(d.data.httpStatus)]
if !ok {
code = codes.Unknown
}
}
return status.Error(code, d.constructHTTPErrMsg())
}
// constructErrMsg constructs error message to be returned in HTTP fallback mode.
// Format: HTTP status code and its corresponding message + content-type error message.
func (d *decodeState) constructHTTPErrMsg() string {
var errMsgs []string
if d.data.httpStatus == nil {
errMsgs = append(errMsgs, "malformed header: missing HTTP status")
} else {
errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status code %d", http.StatusText(*(d.data.httpStatus)), *d.data.httpStatus))
}
if d.data.contentTypeErr == "" {
errMsgs = append(errMsgs, "transport: missing content-type field")
} else {
errMsgs = append(errMsgs, d.data.contentTypeErr)
}
return strings.Join(errMsgs, "; ")
}
func (d *decodeState) addMetadata(k, v string) {
if d.data.mdata == nil {
d.data.mdata = make(map[string][]string)
}
d.data.mdata[k] = append(d.data.mdata[k], v)
}
func (d *decodeState) processHeaderField(f hpack.HeaderField) {
switch f.Name {
case "content-type":
contentSubtype, validContentType := grpcutil.ContentSubtype(f.Value)
if !validContentType {
d.data.contentTypeErr = fmt.Sprintf("transport: received the unexpected content-type %q", f.Value)
return
}
d.data.contentSubtype = contentSubtype
// TODO: do we want to propagate the whole content-type in the metadata,
// or come up with a way to just propagate the content-subtype if it was set?
// ie {"content-type": "application/grpc+proto"} or {"content-subtype": "proto"}
// in the metadata?
d.addMetadata(f.Name, f.Value)
d.data.isGRPC = true
case "grpc-encoding":
d.data.encoding = f.Value
case "grpc-status":
code, err := strconv.Atoi(f.Value)
if err != nil {
d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err)
return
}
d.data.rawStatusCode = &code
case "grpc-message":
d.data.rawStatusMsg = decodeGrpcMessage(f.Value)
case "grpc-status-details-bin":
v, err := decodeBinHeader(f.Value)
if err != nil {
d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
return
}
s := &spb.Status{}
if err := proto.Unmarshal(v, s); err != nil {
d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
return
}
d.data.statusGen = status.FromProto(s)
case "grpc-timeout":
d.data.timeoutSet = true
var err error
if d.data.timeout, err = decodeTimeout(f.Value); err != nil {
d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed time-out: %v", err)
}
case ":path":
d.data.method = f.Value
case ":status":
code, err := strconv.Atoi(f.Value)
if err != nil {
d.data.httpErr = status.Errorf(codes.Internal, "transport: malformed http-status: %v", err)
return
}
d.data.httpStatus = &code
case "grpc-tags-bin":
v, err := decodeBinHeader(f.Value)
if err != nil {
d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err)
return
}
d.data.statsTags = v
d.addMetadata(f.Name, string(v))
case "grpc-trace-bin":
v, err := decodeBinHeader(f.Value)
if err != nil {
d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err)
return
}
d.data.statsTrace = v
d.addMetadata(f.Name, string(v))
default:
if isReservedHeader(f.Name) && !isWhitelistedHeader(f.Name) {
break
}
v, err := decodeMetadataHeader(f.Name, f.Value)
if err != nil {
if logger.V(logLevel) {
logger.Errorf("Failed to decode metadata header (%q, %q): %v", f.Name, f.Value, err)
}
return
}
d.addMetadata(f.Name, v)
st := &spb.Status{}
if err = proto.Unmarshal(v, st); err != nil {
return nil, err
}
return status.FromProto(st), nil
}
type timeoutUnit uint8
@@ -605,3 +394,31 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList
f.fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil)
return f
}
// parseDialTarget returns the network and address to pass to dialer.
func parseDialTarget(target string) (string, string) {
net := "tcp"
m1 := strings.Index(target, ":")
m2 := strings.Index(target, ":/")
// handle unix:addr which will fail with url.Parse
if m1 >= 0 && m2 < 0 {
if n := target[0:m1]; n == "unix" {
return n, target[m1+1:]
}
}
if m2 >= 0 {
t, err := url.Parse(target)
if err != nil {
return net, target
}
scheme := t.Scheme
addr := t.Path
if scheme == "unix" {
if addr == "" {
addr = t.Host
}
return scheme, addr
}
}
return net, target
}

View File

@@ -0,0 +1,46 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package networktype declares the network type to be used in the default
// dialer. Attribute of a resolver.Address.
package networktype
import (
"google.golang.org/grpc/resolver"
)
// keyType is the key to use for storing State in Attributes.
type keyType string
const key = keyType("grpc.internal.transport.networktype")
// Set returns a copy of the provided address with attributes containing networkType.
func Set(address resolver.Address, networkType string) resolver.Address {
address.Attributes = address.Attributes.WithValue(key, networkType)
return address
}
// Get returns the network type in the resolver.Address and true, or "", false
// if not present.
func Get(address resolver.Address) (string, bool) {
v := address.Attributes.Value(key)
if v == nil {
return "", false
}
return v.(string), true
}

View File

@@ -0,0 +1,142 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package transport
import (
"bufio"
"context"
"encoding/base64"
"fmt"
"io"
"net"
"net/http"
"net/http/httputil"
"net/url"
)
const proxyAuthHeaderKey = "Proxy-Authorization"
var (
// The following variable will be overwritten in the tests.
httpProxyFromEnvironment = http.ProxyFromEnvironment
)
func mapAddress(address string) (*url.URL, error) {
req := &http.Request{
URL: &url.URL{
Scheme: "https",
Host: address,
},
}
url, err := httpProxyFromEnvironment(req)
if err != nil {
return nil, err
}
return url, nil
}
// To read a response from a net.Conn, http.ReadResponse() takes a bufio.Reader.
// It's possible that this reader reads more than what's need for the response and stores
// those bytes in the buffer.
// bufConn wraps the original net.Conn and the bufio.Reader to make sure we don't lose the
// bytes in the buffer.
type bufConn struct {
net.Conn
r io.Reader
}
func (c *bufConn) Read(b []byte) (int, error) {
return c.r.Read(b)
}
func basicAuth(username, password string) string {
auth := username + ":" + password
return base64.StdEncoding.EncodeToString([]byte(auth))
}
func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr string, proxyURL *url.URL, grpcUA string) (_ net.Conn, err error) {
defer func() {
if err != nil {
conn.Close()
}
}()
req := &http.Request{
Method: http.MethodConnect,
URL: &url.URL{Host: backendAddr},
Header: map[string][]string{"User-Agent": {grpcUA}},
}
if t := proxyURL.User; t != nil {
u := t.Username()
p, _ := t.Password()
req.Header.Add(proxyAuthHeaderKey, "Basic "+basicAuth(u, p))
}
if err := sendHTTPRequest(ctx, req, conn); err != nil {
return nil, fmt.Errorf("failed to write the HTTP request: %v", err)
}
r := bufio.NewReader(conn)
resp, err := http.ReadResponse(r, req)
if err != nil {
return nil, fmt.Errorf("reading server HTTP response: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
dump, err := httputil.DumpResponse(resp, true)
if err != nil {
return nil, fmt.Errorf("failed to do connect handshake, status code: %s", resp.Status)
}
return nil, fmt.Errorf("failed to do connect handshake, response: %q", dump)
}
return &bufConn{Conn: conn, r: r}, nil
}
// proxyDial dials, connecting to a proxy first if necessary. Checks if a proxy
// is necessary, dials, does the HTTP CONNECT handshake, and returns the
// connection.
func proxyDial(ctx context.Context, addr string, grpcUA string) (conn net.Conn, err error) {
newAddr := addr
proxyURL, err := mapAddress(addr)
if err != nil {
return nil, err
}
if proxyURL != nil {
newAddr = proxyURL.Host
}
conn, err = (&net.Dialer{}).DialContext(ctx, "tcp", newAddr)
if err != nil {
return
}
if proxyURL != nil {
// proxy is disabled if proxyURL is nil.
conn, err = doHTTPConnectHandshake(ctx, conn, addr, proxyURL, grpcUA)
}
return
}
func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) error {
req = req.WithContext(ctx)
if err := req.Write(conn); err != nil {
return fmt.Errorf("failed to write the HTTP request: %v", err)
}
return nil
}

View File

@@ -30,6 +30,7 @@ import (
"net"
"sync"
"sync/atomic"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
@@ -241,6 +242,7 @@ type Stream struct {
ctx context.Context // the associated context of the stream
cancel context.CancelFunc // always nil for client side Stream
done chan struct{} // closed at the end of stream to unblock writers. On the client side.
doneFunc func() // invoked at the end of stream on client side.
ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
method string // the associated RPC method of the stream
recvCompress string
@@ -517,7 +519,8 @@ const (
// ServerConfig consists of all the configurations to establish a server transport.
type ServerConfig struct {
MaxStreams uint32
AuthInfo credentials.AuthInfo
ConnectionTimeout time.Duration
Credentials credentials.TransportCredentials
InTapHandle tap.ServerInHandle
StatsHandler stats.Handler
KeepaliveParams keepalive.ServerParameters
@@ -531,12 +534,6 @@ type ServerConfig struct {
HeaderTableSize *uint32
}
// NewServerTransport creates a ServerTransport with conn or non-nil error
// if it fails.
func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
return newHTTP2Server(conn, config)
}
// ConnectOptions covers all relevant options for communicating with the server.
type ConnectOptions struct {
// UserAgent is the application user agent.
@@ -569,6 +566,8 @@ type ConnectOptions struct {
ChannelzParentID int64
// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
MaxHeaderListSize *uint32
// UseProxy specifies if a proxy should be used.
UseProxy bool
}
// NewClientTransport establishes the transport with the required ConnectOptions
@@ -609,6 +608,8 @@ type CallHdr struct {
ContentSubtype string
PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
DoneFunc func() // called when the stream is finished
}
// ClientTransport is the common interface for all gRPC client-side transport
@@ -617,7 +618,7 @@ type ClientTransport interface {
// Close tears down this transport. Once it returns, the transport
// should not be accessed any more. The caller must make sure this
// is called only once.
Close() error
Close(err error)
// GracefulClose starts to tear down the transport: the transport will stop
// accepting new RPCs and NewStream will return error. Once all streams are
@@ -651,8 +652,9 @@ type ClientTransport interface {
// HTTP/2).
GoAway() <-chan struct{}
// GetGoAwayReason returns the reason why GoAway frame was received.
GetGoAwayReason() GoAwayReason
// GetGoAwayReason returns the reason why GoAway frame was received, along
// with a human readable string with debug info.
GetGoAwayReason() (GoAwayReason, string)
// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr
@@ -688,7 +690,7 @@ type ServerTransport interface {
// Close tears down the transport. Once it is called, the transport
// should not be accessed any more. All the pending streams and their
// handlers will be terminated asynchronously.
Close() error
Close()
// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr