mirror of
https://github.com/cloudflare/cloudflared.git
synced 2025-07-28 14:59:58 +00:00
TUN-9467: bump coredns to solve CVE
* TUN-9467: bump coredns to solve CVE
This commit is contained in:

committed by
João "Pisco" Fernandes

parent
f8d12c9d39
commit
a408612f26
144
vendor/google.golang.org/grpc/internal/transport/client_stream.go
generated
vendored
Normal file
144
vendor/google.golang.org/grpc/internal/transport/client_stream.go
generated
vendored
Normal file
@@ -0,0 +1,144 @@
|
||||
/*
|
||||
*
|
||||
* Copyright 2024 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package transport
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
|
||||
"golang.org/x/net/http2"
|
||||
"google.golang.org/grpc/mem"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// ClientStream implements streaming functionality for a gRPC client.
|
||||
type ClientStream struct {
|
||||
*Stream // Embed for common stream functionality.
|
||||
|
||||
ct *http2Client
|
||||
done chan struct{} // closed at the end of stream to unblock writers.
|
||||
doneFunc func() // invoked at the end of stream.
|
||||
|
||||
headerChan chan struct{} // closed to indicate the end of header metadata.
|
||||
headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
|
||||
// headerValid indicates whether a valid header was received. Only
|
||||
// meaningful after headerChan is closed (always call waitOnHeader() before
|
||||
// reading its value).
|
||||
headerValid bool
|
||||
header metadata.MD // the received header metadata
|
||||
noHeaders bool // set if the client never received headers (set only after the stream is done).
|
||||
|
||||
bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream
|
||||
unprocessed atomic.Bool // set if the server sends a refused stream or GOAWAY including this stream
|
||||
|
||||
status *status.Status // the status error received from the server
|
||||
}
|
||||
|
||||
// Read reads an n byte message from the input stream.
|
||||
func (s *ClientStream) Read(n int) (mem.BufferSlice, error) {
|
||||
b, err := s.Stream.read(n)
|
||||
if err == nil {
|
||||
s.ct.incrMsgRecv()
|
||||
}
|
||||
return b, err
|
||||
}
|
||||
|
||||
// Close closes the stream and propagates err to any readers.
|
||||
func (s *ClientStream) Close(err error) {
|
||||
var (
|
||||
rst bool
|
||||
rstCode http2.ErrCode
|
||||
)
|
||||
if err != nil {
|
||||
rst = true
|
||||
rstCode = http2.ErrCodeCancel
|
||||
}
|
||||
s.ct.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
|
||||
}
|
||||
|
||||
// Write writes the hdr and data bytes to the output stream.
|
||||
func (s *ClientStream) Write(hdr []byte, data mem.BufferSlice, opts *WriteOptions) error {
|
||||
return s.ct.write(s, hdr, data, opts)
|
||||
}
|
||||
|
||||
// BytesReceived indicates whether any bytes have been received on this stream.
|
||||
func (s *ClientStream) BytesReceived() bool {
|
||||
return s.bytesReceived.Load()
|
||||
}
|
||||
|
||||
// Unprocessed indicates whether the server did not process this stream --
|
||||
// i.e. it sent a refused stream or GOAWAY including this stream ID.
|
||||
func (s *ClientStream) Unprocessed() bool {
|
||||
return s.unprocessed.Load()
|
||||
}
|
||||
|
||||
func (s *ClientStream) waitOnHeader() {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
// Close the stream to prevent headers/trailers from changing after
|
||||
// this function returns.
|
||||
s.Close(ContextErr(s.ctx.Err()))
|
||||
// headerChan could possibly not be closed yet if closeStream raced
|
||||
// with operateHeaders; wait until it is closed explicitly here.
|
||||
<-s.headerChan
|
||||
case <-s.headerChan:
|
||||
}
|
||||
}
|
||||
|
||||
// RecvCompress returns the compression algorithm applied to the inbound
|
||||
// message. It is empty string if there is no compression applied.
|
||||
func (s *ClientStream) RecvCompress() string {
|
||||
s.waitOnHeader()
|
||||
return s.recvCompress
|
||||
}
|
||||
|
||||
// Done returns a channel which is closed when it receives the final status
|
||||
// from the server.
|
||||
func (s *ClientStream) Done() <-chan struct{} {
|
||||
return s.done
|
||||
}
|
||||
|
||||
// Header returns the header metadata of the stream. Acquires the key-value
|
||||
// pairs of header metadata once it is available. It blocks until i) the
|
||||
// metadata is ready or ii) there is no header metadata or iii) the stream is
|
||||
// canceled/expired.
|
||||
func (s *ClientStream) Header() (metadata.MD, error) {
|
||||
s.waitOnHeader()
|
||||
|
||||
if !s.headerValid || s.noHeaders {
|
||||
return nil, s.status.Err()
|
||||
}
|
||||
|
||||
return s.header.Copy(), nil
|
||||
}
|
||||
|
||||
// TrailersOnly blocks until a header or trailers-only frame is received and
|
||||
// then returns true if the stream was trailers-only. If the stream ends
|
||||
// before headers are received, returns true, nil.
|
||||
func (s *ClientStream) TrailersOnly() bool {
|
||||
s.waitOnHeader()
|
||||
return s.noHeaders
|
||||
}
|
||||
|
||||
// Status returns the status received from the server.
|
||||
// Status can be read safely only after the stream has ended,
|
||||
// that is, after Done() is closed.
|
||||
func (s *ClientStream) Status() *status.Status {
|
||||
return s.status
|
||||
}
|
289
vendor/google.golang.org/grpc/internal/transport/controlbuf.go
generated
vendored
289
vendor/google.golang.org/grpc/internal/transport/controlbuf.go
generated
vendored
@@ -32,6 +32,7 @@ import (
|
||||
"golang.org/x/net/http2/hpack"
|
||||
"google.golang.org/grpc/internal/grpclog"
|
||||
"google.golang.org/grpc/internal/grpcutil"
|
||||
"google.golang.org/grpc/mem"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
@@ -148,9 +149,9 @@ type dataFrame struct {
|
||||
streamID uint32
|
||||
endStream bool
|
||||
h []byte
|
||||
d []byte
|
||||
reader mem.Reader
|
||||
// onEachWrite is called every time
|
||||
// a part of d is written out.
|
||||
// a part of data is written out.
|
||||
onEachWrite func()
|
||||
}
|
||||
|
||||
@@ -193,7 +194,7 @@ type goAway struct {
|
||||
code http2.ErrCode
|
||||
debugData []byte
|
||||
headsUp bool
|
||||
closeConn error // if set, loopyWriter will exit, resulting in conn closure
|
||||
closeConn error // if set, loopyWriter will exit with this error
|
||||
}
|
||||
|
||||
func (*goAway) isTransportResponseFrame() bool { return false }
|
||||
@@ -289,18 +290,22 @@ func (l *outStreamList) dequeue() *outStream {
|
||||
}
|
||||
|
||||
// controlBuffer is a way to pass information to loopy.
|
||||
// Information is passed as specific struct types called control frames.
|
||||
// A control frame not only represents data, messages or headers to be sent out
|
||||
// but can also be used to instruct loopy to update its internal state.
|
||||
// It shouldn't be confused with an HTTP2 frame, although some of the control frames
|
||||
// like dataFrame and headerFrame do go out on wire as HTTP2 frames.
|
||||
//
|
||||
// Information is passed as specific struct types called control frames. A
|
||||
// control frame not only represents data, messages or headers to be sent out
|
||||
// but can also be used to instruct loopy to update its internal state. It
|
||||
// shouldn't be confused with an HTTP2 frame, although some of the control
|
||||
// frames like dataFrame and headerFrame do go out on wire as HTTP2 frames.
|
||||
type controlBuffer struct {
|
||||
ch chan struct{}
|
||||
done <-chan struct{}
|
||||
wakeupCh chan struct{} // Unblocks readers waiting for something to read.
|
||||
done <-chan struct{} // Closed when the transport is done.
|
||||
|
||||
// Mutex guards all the fields below, except trfChan which can be read
|
||||
// atomically without holding mu.
|
||||
mu sync.Mutex
|
||||
consumerWaiting bool
|
||||
list *itemList
|
||||
err error
|
||||
consumerWaiting bool // True when readers are blocked waiting for new data.
|
||||
closed bool // True when the controlbuf is finished.
|
||||
list *itemList // List of queued control frames.
|
||||
|
||||
// transportResponseFrames counts the number of queued items that represent
|
||||
// the response of an action initiated by the peer. trfChan is created
|
||||
@@ -308,47 +313,59 @@ type controlBuffer struct {
|
||||
// closed and nilled when transportResponseFrames drops below the
|
||||
// threshold. Both fields are protected by mu.
|
||||
transportResponseFrames int
|
||||
trfChan atomic.Value // chan struct{}
|
||||
trfChan atomic.Pointer[chan struct{}]
|
||||
}
|
||||
|
||||
func newControlBuffer(done <-chan struct{}) *controlBuffer {
|
||||
return &controlBuffer{
|
||||
ch: make(chan struct{}, 1),
|
||||
list: &itemList{},
|
||||
done: done,
|
||||
wakeupCh: make(chan struct{}, 1),
|
||||
list: &itemList{},
|
||||
done: done,
|
||||
}
|
||||
}
|
||||
|
||||
// throttle blocks if there are too many incomingSettings/cleanupStreams in the
|
||||
// controlbuf.
|
||||
// throttle blocks if there are too many frames in the control buf that
|
||||
// represent the response of an action initiated by the peer, like
|
||||
// incomingSettings cleanupStreams etc.
|
||||
func (c *controlBuffer) throttle() {
|
||||
ch, _ := c.trfChan.Load().(chan struct{})
|
||||
if ch != nil {
|
||||
if ch := c.trfChan.Load(); ch != nil {
|
||||
select {
|
||||
case <-ch:
|
||||
case <-(*ch):
|
||||
case <-c.done:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// put adds an item to the controlbuf.
|
||||
func (c *controlBuffer) put(it cbItem) error {
|
||||
_, err := c.executeAndPut(nil, it)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *controlBuffer) executeAndPut(f func(it any) bool, it cbItem) (bool, error) {
|
||||
var wakeUp bool
|
||||
// executeAndPut runs f, and if the return value is true, adds the given item to
|
||||
// the controlbuf. The item could be nil, in which case, this method simply
|
||||
// executes f and does not add the item to the controlbuf.
|
||||
//
|
||||
// The first return value indicates whether the item was successfully added to
|
||||
// the control buffer. A non-nil error, specifically ErrConnClosing, is returned
|
||||
// if the control buffer is already closed.
|
||||
func (c *controlBuffer) executeAndPut(f func() bool, it cbItem) (bool, error) {
|
||||
c.mu.Lock()
|
||||
if c.err != nil {
|
||||
c.mu.Unlock()
|
||||
return false, c.err
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.closed {
|
||||
return false, ErrConnClosing
|
||||
}
|
||||
if f != nil {
|
||||
if !f(it) { // f wasn't successful
|
||||
c.mu.Unlock()
|
||||
if !f() { // f wasn't successful
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
if it == nil {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
var wakeUp bool
|
||||
if c.consumerWaiting {
|
||||
wakeUp = true
|
||||
c.consumerWaiting = false
|
||||
@@ -359,98 +376,102 @@ func (c *controlBuffer) executeAndPut(f func(it any) bool, it cbItem) (bool, err
|
||||
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
|
||||
// We are adding the frame that puts us over the threshold; create
|
||||
// a throttling channel.
|
||||
c.trfChan.Store(make(chan struct{}))
|
||||
ch := make(chan struct{})
|
||||
c.trfChan.Store(&ch)
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
if wakeUp {
|
||||
select {
|
||||
case c.ch <- struct{}{}:
|
||||
case c.wakeupCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Note argument f should never be nil.
|
||||
func (c *controlBuffer) execute(f func(it any) bool, it any) (bool, error) {
|
||||
c.mu.Lock()
|
||||
if c.err != nil {
|
||||
c.mu.Unlock()
|
||||
return false, c.err
|
||||
}
|
||||
if !f(it) { // f wasn't successful
|
||||
c.mu.Unlock()
|
||||
return false, nil
|
||||
}
|
||||
c.mu.Unlock()
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// get returns the next control frame from the control buffer. If block is true
|
||||
// **and** there are no control frames in the control buffer, the call blocks
|
||||
// until one of the conditions is met: there is a frame to return or the
|
||||
// transport is closed.
|
||||
func (c *controlBuffer) get(block bool) (any, error) {
|
||||
for {
|
||||
c.mu.Lock()
|
||||
if c.err != nil {
|
||||
frame, err := c.getOnceLocked()
|
||||
if frame != nil || err != nil || !block {
|
||||
// If we read a frame or an error, we can return to the caller. The
|
||||
// call to getOnceLocked() returns a nil frame and a nil error if
|
||||
// there is nothing to read, and in that case, if the caller asked
|
||||
// us not to block, we can return now as well.
|
||||
c.mu.Unlock()
|
||||
return nil, c.err
|
||||
}
|
||||
if !c.list.isEmpty() {
|
||||
h := c.list.dequeue().(cbItem)
|
||||
if h.isTransportResponseFrame() {
|
||||
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
|
||||
// We are removing the frame that put us over the
|
||||
// threshold; close and clear the throttling channel.
|
||||
ch := c.trfChan.Load().(chan struct{})
|
||||
close(ch)
|
||||
c.trfChan.Store((chan struct{})(nil))
|
||||
}
|
||||
c.transportResponseFrames--
|
||||
}
|
||||
c.mu.Unlock()
|
||||
return h, nil
|
||||
}
|
||||
if !block {
|
||||
c.mu.Unlock()
|
||||
return nil, nil
|
||||
return frame, err
|
||||
}
|
||||
c.consumerWaiting = true
|
||||
c.mu.Unlock()
|
||||
|
||||
// Release the lock above and wait to be woken up.
|
||||
select {
|
||||
case <-c.ch:
|
||||
case <-c.wakeupCh:
|
||||
case <-c.done:
|
||||
return nil, errors.New("transport closed by client")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Callers must not use this method, but should instead use get().
|
||||
//
|
||||
// Caller must hold c.mu.
|
||||
func (c *controlBuffer) getOnceLocked() (any, error) {
|
||||
if c.closed {
|
||||
return false, ErrConnClosing
|
||||
}
|
||||
if c.list.isEmpty() {
|
||||
return nil, nil
|
||||
}
|
||||
h := c.list.dequeue().(cbItem)
|
||||
if h.isTransportResponseFrame() {
|
||||
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
|
||||
// We are removing the frame that put us over the
|
||||
// threshold; close and clear the throttling channel.
|
||||
ch := c.trfChan.Swap(nil)
|
||||
close(*ch)
|
||||
}
|
||||
c.transportResponseFrames--
|
||||
}
|
||||
return h, nil
|
||||
}
|
||||
|
||||
// finish closes the control buffer, cleaning up any streams that have queued
|
||||
// header frames. Once this method returns, no more frames can be added to the
|
||||
// control buffer, and attempts to do so will return ErrConnClosing.
|
||||
func (c *controlBuffer) finish() {
|
||||
c.mu.Lock()
|
||||
if c.err != nil {
|
||||
c.mu.Unlock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.closed {
|
||||
return
|
||||
}
|
||||
c.err = ErrConnClosing
|
||||
c.closed = true
|
||||
// There may be headers for streams in the control buffer.
|
||||
// These streams need to be cleaned out since the transport
|
||||
// is still not aware of these yet.
|
||||
for head := c.list.dequeueAll(); head != nil; head = head.next {
|
||||
hdr, ok := head.it.(*headerFrame)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if hdr.onOrphaned != nil { // It will be nil on the server-side.
|
||||
hdr.onOrphaned(ErrConnClosing)
|
||||
switch v := head.it.(type) {
|
||||
case *headerFrame:
|
||||
if v.onOrphaned != nil { // It will be nil on the server-side.
|
||||
v.onOrphaned(ErrConnClosing)
|
||||
}
|
||||
case *dataFrame:
|
||||
_ = v.reader.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// In case throttle() is currently in flight, it needs to be unblocked.
|
||||
// Otherwise, the transport may not close, since the transport is closed by
|
||||
// the reader encountering the connection error.
|
||||
ch, _ := c.trfChan.Load().(chan struct{})
|
||||
ch := c.trfChan.Swap(nil)
|
||||
if ch != nil {
|
||||
close(ch)
|
||||
close(*ch)
|
||||
}
|
||||
c.trfChan.Store((chan struct{})(nil))
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
type side int
|
||||
@@ -466,7 +487,7 @@ const (
|
||||
// stream maintains a queue of data frames; as loopy receives data frames
|
||||
// it gets added to the queue of the relevant stream.
|
||||
// Loopy goes over this list of active streams by processing one node every iteration,
|
||||
// thereby closely resemebling to a round-robin scheduling over all streams. While
|
||||
// thereby closely resembling a round-robin scheduling over all streams. While
|
||||
// processing a stream, loopy writes out data bytes from this stream capped by the min
|
||||
// of http2MaxFrameLen, connection-level flow control and stream-level flow control.
|
||||
type loopyWriter struct {
|
||||
@@ -490,26 +511,29 @@ type loopyWriter struct {
|
||||
draining bool
|
||||
conn net.Conn
|
||||
logger *grpclog.PrefixLogger
|
||||
bufferPool mem.BufferPool
|
||||
|
||||
// Side-specific handlers
|
||||
ssGoAwayHandler func(*goAway) (bool, error)
|
||||
}
|
||||
|
||||
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger) *loopyWriter {
|
||||
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error), bufferPool mem.BufferPool) *loopyWriter {
|
||||
var buf bytes.Buffer
|
||||
l := &loopyWriter{
|
||||
side: s,
|
||||
cbuf: cbuf,
|
||||
sendQuota: defaultWindowSize,
|
||||
oiws: defaultWindowSize,
|
||||
estdStreams: make(map[uint32]*outStream),
|
||||
activeStreams: newOutStreamList(),
|
||||
framer: fr,
|
||||
hBuf: &buf,
|
||||
hEnc: hpack.NewEncoder(&buf),
|
||||
bdpEst: bdpEst,
|
||||
conn: conn,
|
||||
logger: logger,
|
||||
side: s,
|
||||
cbuf: cbuf,
|
||||
sendQuota: defaultWindowSize,
|
||||
oiws: defaultWindowSize,
|
||||
estdStreams: make(map[uint32]*outStream),
|
||||
activeStreams: newOutStreamList(),
|
||||
framer: fr,
|
||||
hBuf: &buf,
|
||||
hEnc: hpack.NewEncoder(&buf),
|
||||
bdpEst: bdpEst,
|
||||
conn: conn,
|
||||
logger: logger,
|
||||
ssGoAwayHandler: goAwayHandler,
|
||||
bufferPool: bufferPool,
|
||||
}
|
||||
return l
|
||||
}
|
||||
@@ -767,6 +791,11 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
|
||||
// not be established yet.
|
||||
delete(l.estdStreams, c.streamID)
|
||||
str.deleteSelf()
|
||||
for head := str.itl.dequeueAll(); head != nil; head = head.next {
|
||||
if df, ok := head.it.(*dataFrame); ok {
|
||||
_ = df.reader.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
if c.rst { // If RST_STREAM needs to be sent.
|
||||
if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
|
||||
@@ -902,16 +931,18 @@ func (l *loopyWriter) processData() (bool, error) {
|
||||
dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
|
||||
// A data item is represented by a dataFrame, since it later translates into
|
||||
// multiple HTTP2 data frames.
|
||||
// Every dataFrame has two buffers; h that keeps grpc-message header and d that is actual data.
|
||||
// As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
|
||||
// maximum possible HTTP2 frame size.
|
||||
// Every dataFrame has two buffers; h that keeps grpc-message header and data
|
||||
// that is the actual message. As an optimization to keep wire traffic low, data
|
||||
// from data is copied to h to make as big as the maximum possible HTTP2 frame
|
||||
// size.
|
||||
|
||||
if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
|
||||
if len(dataItem.h) == 0 && dataItem.reader.Remaining() == 0 { // Empty data frame
|
||||
// Client sends out empty data frame with endStream = true
|
||||
if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
|
||||
return false, err
|
||||
}
|
||||
str.itl.dequeue() // remove the empty data item from stream
|
||||
_ = dataItem.reader.Close()
|
||||
if str.itl.isEmpty() {
|
||||
str.state = empty
|
||||
} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
|
||||
@@ -926,9 +957,7 @@ func (l *loopyWriter) processData() (bool, error) {
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
var (
|
||||
buf []byte
|
||||
)
|
||||
|
||||
// Figure out the maximum size we can send
|
||||
maxSize := http2MaxFrameLen
|
||||
if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
|
||||
@@ -942,43 +971,50 @@ func (l *loopyWriter) processData() (bool, error) {
|
||||
}
|
||||
// Compute how much of the header and data we can send within quota and max frame length
|
||||
hSize := min(maxSize, len(dataItem.h))
|
||||
dSize := min(maxSize-hSize, len(dataItem.d))
|
||||
if hSize != 0 {
|
||||
if dSize == 0 {
|
||||
buf = dataItem.h
|
||||
} else {
|
||||
// We can add some data to grpc message header to distribute bytes more equally across frames.
|
||||
// Copy on the stack to avoid generating garbage
|
||||
var localBuf [http2MaxFrameLen]byte
|
||||
copy(localBuf[:hSize], dataItem.h)
|
||||
copy(localBuf[hSize:], dataItem.d[:dSize])
|
||||
buf = localBuf[:hSize+dSize]
|
||||
}
|
||||
} else {
|
||||
buf = dataItem.d
|
||||
}
|
||||
|
||||
dSize := min(maxSize-hSize, dataItem.reader.Remaining())
|
||||
remainingBytes := len(dataItem.h) + dataItem.reader.Remaining() - hSize - dSize
|
||||
size := hSize + dSize
|
||||
|
||||
var buf *[]byte
|
||||
|
||||
if hSize != 0 && dSize == 0 {
|
||||
buf = &dataItem.h
|
||||
} else {
|
||||
// Note: this is only necessary because the http2.Framer does not support
|
||||
// partially writing a frame, so the sequence must be materialized into a buffer.
|
||||
// TODO: Revisit once https://github.com/golang/go/issues/66655 is addressed.
|
||||
pool := l.bufferPool
|
||||
if pool == nil {
|
||||
// Note that this is only supposed to be nil in tests. Otherwise, stream is
|
||||
// always initialized with a BufferPool.
|
||||
pool = mem.DefaultBufferPool()
|
||||
}
|
||||
buf = pool.Get(size)
|
||||
defer pool.Put(buf)
|
||||
|
||||
copy((*buf)[:hSize], dataItem.h)
|
||||
_, _ = dataItem.reader.Read((*buf)[hSize:])
|
||||
}
|
||||
|
||||
// Now that outgoing flow controls are checked we can replenish str's write quota
|
||||
str.wq.replenish(size)
|
||||
var endStream bool
|
||||
// If this is the last data message on this stream and all of it can be written in this iteration.
|
||||
if dataItem.endStream && len(dataItem.h)+len(dataItem.d) <= size {
|
||||
if dataItem.endStream && remainingBytes == 0 {
|
||||
endStream = true
|
||||
}
|
||||
if dataItem.onEachWrite != nil {
|
||||
dataItem.onEachWrite()
|
||||
}
|
||||
if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
|
||||
if err := l.framer.fr.WriteData(dataItem.streamID, endStream, (*buf)[:size]); err != nil {
|
||||
return false, err
|
||||
}
|
||||
str.bytesOutStanding += size
|
||||
l.sendQuota -= uint32(size)
|
||||
dataItem.h = dataItem.h[hSize:]
|
||||
dataItem.d = dataItem.d[dSize:]
|
||||
|
||||
if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
|
||||
if remainingBytes == 0 { // All the data from that message was written out.
|
||||
_ = dataItem.reader.Close()
|
||||
str.itl.dequeue()
|
||||
}
|
||||
if str.itl.isEmpty() {
|
||||
@@ -997,10 +1033,3 @@ func (l *loopyWriter) processData() (bool, error) {
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func min(a, b int) int {
|
||||
if a < b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
9
vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
generated
vendored
9
vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
generated
vendored
@@ -92,14 +92,11 @@ func (f *trInFlow) newLimit(n uint32) uint32 {
|
||||
|
||||
func (f *trInFlow) onData(n uint32) uint32 {
|
||||
f.unacked += n
|
||||
if f.unacked >= f.limit/4 {
|
||||
w := f.unacked
|
||||
f.unacked = 0
|
||||
if f.unacked < f.limit/4 {
|
||||
f.updateEffectiveWindowSize()
|
||||
return w
|
||||
return 0
|
||||
}
|
||||
f.updateEffectiveWindowSize()
|
||||
return 0
|
||||
return f.reset()
|
||||
}
|
||||
|
||||
func (f *trInFlow) reset() uint32 {
|
||||
|
83
vendor/google.golang.org/grpc/internal/transport/handler_server.go
generated
vendored
83
vendor/google.golang.org/grpc/internal/transport/handler_server.go
generated
vendored
@@ -24,7 +24,6 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -40,6 +39,7 @@ import (
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/internal/grpclog"
|
||||
"google.golang.org/grpc/internal/grpcutil"
|
||||
"google.golang.org/grpc/mem"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/stats"
|
||||
@@ -50,7 +50,7 @@ import (
|
||||
// NewServerHandlerTransport returns a ServerTransport handling gRPC from
|
||||
// inside an http.Handler, or writes an HTTP error to w and returns an error.
|
||||
// It requires that the http Server supports HTTP/2.
|
||||
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler) (ServerTransport, error) {
|
||||
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler, bufferPool mem.BufferPool) (ServerTransport, error) {
|
||||
if r.Method != http.MethodPost {
|
||||
w.Header().Set("Allow", http.MethodPost)
|
||||
msg := fmt.Sprintf("invalid gRPC request method %q", r.Method)
|
||||
@@ -98,6 +98,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s
|
||||
contentType: contentType,
|
||||
contentSubtype: contentSubtype,
|
||||
stats: stats,
|
||||
bufferPool: bufferPool,
|
||||
}
|
||||
st.logger = prefixLoggerForServerHandlerTransport(st)
|
||||
|
||||
@@ -171,6 +172,8 @@ type serverHandlerTransport struct {
|
||||
|
||||
stats []stats.Handler
|
||||
logger *grpclog.PrefixLogger
|
||||
|
||||
bufferPool mem.BufferPool
|
||||
}
|
||||
|
||||
func (ht *serverHandlerTransport) Close(err error) {
|
||||
@@ -222,7 +225,7 @@ func (ht *serverHandlerTransport) do(fn func()) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
|
||||
func (ht *serverHandlerTransport) writeStatus(s *ServerStream, st *status.Status) error {
|
||||
ht.writeStatusMu.Lock()
|
||||
defer ht.writeStatusMu.Unlock()
|
||||
|
||||
@@ -244,6 +247,7 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
|
||||
}
|
||||
|
||||
s.hdrMu.Lock()
|
||||
defer s.hdrMu.Unlock()
|
||||
if p := st.Proto(); p != nil && len(p.Details) > 0 {
|
||||
delete(s.trailer, grpcStatusDetailsBinHeader)
|
||||
stBytes, err := proto.Marshal(p)
|
||||
@@ -268,7 +272,6 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
|
||||
}
|
||||
}
|
||||
}
|
||||
s.hdrMu.Unlock()
|
||||
})
|
||||
|
||||
if err == nil { // transport has not been closed
|
||||
@@ -286,14 +289,14 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
|
||||
|
||||
// writePendingHeaders sets common and custom headers on the first
|
||||
// write call (Write, WriteHeader, or WriteStatus)
|
||||
func (ht *serverHandlerTransport) writePendingHeaders(s *Stream) {
|
||||
func (ht *serverHandlerTransport) writePendingHeaders(s *ServerStream) {
|
||||
ht.writeCommonHeaders(s)
|
||||
ht.writeCustomHeaders(s)
|
||||
}
|
||||
|
||||
// writeCommonHeaders sets common headers on the first write
|
||||
// call (Write, WriteHeader, or WriteStatus).
|
||||
func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
|
||||
func (ht *serverHandlerTransport) writeCommonHeaders(s *ServerStream) {
|
||||
h := ht.rw.Header()
|
||||
h["Date"] = nil // suppress Date to make tests happy; TODO: restore
|
||||
h.Set("Content-Type", ht.contentType)
|
||||
@@ -314,7 +317,7 @@ func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
|
||||
|
||||
// writeCustomHeaders sets custom headers set on the stream via SetHeader
|
||||
// on the first write call (Write, WriteHeader, or WriteStatus)
|
||||
func (ht *serverHandlerTransport) writeCustomHeaders(s *Stream) {
|
||||
func (ht *serverHandlerTransport) writeCustomHeaders(s *ServerStream) {
|
||||
h := ht.rw.Header()
|
||||
|
||||
s.hdrMu.Lock()
|
||||
@@ -330,19 +333,31 @@ func (ht *serverHandlerTransport) writeCustomHeaders(s *Stream) {
|
||||
s.hdrMu.Unlock()
|
||||
}
|
||||
|
||||
func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
|
||||
func (ht *serverHandlerTransport) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ *WriteOptions) error {
|
||||
// Always take a reference because otherwise there is no guarantee the data will
|
||||
// be available after this function returns. This is what callers to Write
|
||||
// expect.
|
||||
data.Ref()
|
||||
headersWritten := s.updateHeaderSent()
|
||||
return ht.do(func() {
|
||||
err := ht.do(func() {
|
||||
defer data.Free()
|
||||
if !headersWritten {
|
||||
ht.writePendingHeaders(s)
|
||||
}
|
||||
ht.rw.Write(hdr)
|
||||
ht.rw.Write(data)
|
||||
for _, b := range data {
|
||||
_, _ = ht.rw.Write(b.ReadOnlyData())
|
||||
}
|
||||
ht.rw.(http.Flusher).Flush()
|
||||
})
|
||||
if err != nil {
|
||||
data.Free()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
func (ht *serverHandlerTransport) writeHeader(s *ServerStream, md metadata.MD) error {
|
||||
if err := s.SetHeader(md); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -370,7 +385,7 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream func(*Stream)) {
|
||||
func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream func(*ServerStream)) {
|
||||
// With this transport type there will be exactly 1 stream: this HTTP request.
|
||||
var cancel context.CancelFunc
|
||||
if ht.timeoutSet {
|
||||
@@ -393,20 +408,22 @@ func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream
|
||||
|
||||
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
|
||||
req := ht.req
|
||||
s := &Stream{
|
||||
id: 0, // irrelevant
|
||||
ctx: ctx,
|
||||
requestRead: func(int) {},
|
||||
s := &ServerStream{
|
||||
Stream: &Stream{
|
||||
id: 0, // irrelevant
|
||||
ctx: ctx,
|
||||
requestRead: func(int) {},
|
||||
buf: newRecvBuffer(),
|
||||
method: req.URL.Path,
|
||||
recvCompress: req.Header.Get("grpc-encoding"),
|
||||
contentSubtype: ht.contentSubtype,
|
||||
},
|
||||
cancel: cancel,
|
||||
buf: newRecvBuffer(),
|
||||
st: ht,
|
||||
method: req.URL.Path,
|
||||
recvCompress: req.Header.Get("grpc-encoding"),
|
||||
contentSubtype: ht.contentSubtype,
|
||||
headerWireLength: 0, // won't have access to header wire length until golang/go#18997.
|
||||
}
|
||||
s.trReader = &transportReader{
|
||||
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},
|
||||
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf},
|
||||
windowHandler: func(int) {},
|
||||
}
|
||||
|
||||
@@ -415,21 +432,19 @@ func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream
|
||||
go func() {
|
||||
defer close(readerDone)
|
||||
|
||||
// TODO: minimize garbage, optimize recvBuffer code/ownership
|
||||
const readSize = 8196
|
||||
for buf := make([]byte, readSize); ; {
|
||||
n, err := req.Body.Read(buf)
|
||||
for {
|
||||
buf := ht.bufferPool.Get(http2MaxFrameLen)
|
||||
n, err := req.Body.Read(*buf)
|
||||
if n > 0 {
|
||||
s.buf.put(recvMsg{buffer: bytes.NewBuffer(buf[:n:n])})
|
||||
buf = buf[n:]
|
||||
*buf = (*buf)[:n]
|
||||
s.buf.put(recvMsg{buffer: mem.NewBuffer(buf, ht.bufferPool)})
|
||||
} else {
|
||||
ht.bufferPool.Put(buf)
|
||||
}
|
||||
if err != nil {
|
||||
s.buf.put(recvMsg{err: mapRecvMsgError(err)})
|
||||
return
|
||||
}
|
||||
if len(buf) == 0 {
|
||||
buf = make([]byte, readSize)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -458,11 +473,9 @@ func (ht *serverHandlerTransport) runStream() {
|
||||
}
|
||||
}
|
||||
|
||||
func (ht *serverHandlerTransport) IncrMsgSent() {}
|
||||
func (ht *serverHandlerTransport) incrMsgRecv() {}
|
||||
|
||||
func (ht *serverHandlerTransport) IncrMsgRecv() {}
|
||||
|
||||
func (ht *serverHandlerTransport) Drain(debugData string) {
|
||||
func (ht *serverHandlerTransport) Drain(string) {
|
||||
panic("Drain() is not implemented")
|
||||
}
|
||||
|
||||
@@ -485,5 +498,5 @@ func mapRecvMsgError(err error) error {
|
||||
if strings.Contains(err.Error(), "body closed by handler") {
|
||||
return status.Error(codes.Canceled, err.Error())
|
||||
}
|
||||
return connectionErrorf(true, err, err.Error())
|
||||
return connectionErrorf(true, err, "%s", err.Error())
|
||||
}
|
||||
|
301
vendor/google.golang.org/grpc/internal/transport/http2_client.go
generated
vendored
301
vendor/google.golang.org/grpc/internal/transport/http2_client.go
generated
vendored
@@ -43,10 +43,12 @@ import (
|
||||
"google.golang.org/grpc/internal/grpcsync"
|
||||
"google.golang.org/grpc/internal/grpcutil"
|
||||
imetadata "google.golang.org/grpc/internal/metadata"
|
||||
"google.golang.org/grpc/internal/proxyattributes"
|
||||
istatus "google.golang.org/grpc/internal/status"
|
||||
isyscall "google.golang.org/grpc/internal/syscall"
|
||||
"google.golang.org/grpc/internal/transport/networktype"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/mem"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/resolver"
|
||||
@@ -59,6 +61,8 @@ import (
|
||||
// atomically.
|
||||
var clientConnectionCounter uint64
|
||||
|
||||
var goAwayLoopyWriterTimeout = 5 * time.Second
|
||||
|
||||
var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool))
|
||||
|
||||
// http2Client implements the ClientTransport interface with HTTP2.
|
||||
@@ -83,9 +87,9 @@ type http2Client struct {
|
||||
writerDone chan struct{} // sync point to enable testing.
|
||||
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
|
||||
// that the server sent GoAway on this transport.
|
||||
goAway chan struct{}
|
||||
|
||||
framer *framer
|
||||
goAway chan struct{}
|
||||
keepaliveDone chan struct{} // Closed when the keepalive goroutine exits.
|
||||
framer *framer
|
||||
// controlBuf delivers all the control related tasks (e.g., window
|
||||
// updates, reset streams, and various settings) to the controller.
|
||||
// Do not access controlBuf with mu held.
|
||||
@@ -114,13 +118,13 @@ type http2Client struct {
|
||||
streamQuota int64
|
||||
streamsQuotaAvailable chan struct{}
|
||||
waitingStreams uint32
|
||||
nextID uint32
|
||||
registeredCompressors string
|
||||
|
||||
// Do not access controlBuf with mu held.
|
||||
mu sync.Mutex // guard the following variables
|
||||
nextID uint32
|
||||
state transportState
|
||||
activeStreams map[uint32]*Stream
|
||||
activeStreams map[uint32]*ClientStream
|
||||
// prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
|
||||
prevGoAwayID uint32
|
||||
// goAwayReason records the http2.ErrCode and debug data received with the
|
||||
@@ -144,13 +148,13 @@ type http2Client struct {
|
||||
|
||||
onClose func(GoAwayReason)
|
||||
|
||||
bufferPool *bufferPool
|
||||
bufferPool mem.BufferPool
|
||||
|
||||
connectionID uint64
|
||||
logger *grpclog.PrefixLogger
|
||||
}
|
||||
|
||||
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) {
|
||||
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, grpcUA string) (net.Conn, error) {
|
||||
address := addr.Addr
|
||||
networkType, ok := networktype.Get(addr)
|
||||
if fn != nil {
|
||||
@@ -172,10 +176,10 @@ func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error
|
||||
return fn(ctx, address)
|
||||
}
|
||||
if !ok {
|
||||
networkType, address = parseDialTarget(address)
|
||||
networkType, address = ParseDialTarget(address)
|
||||
}
|
||||
if networkType == "tcp" && useProxy {
|
||||
return proxyDial(ctx, address, grpcUA)
|
||||
if opts, present := proxyattributes.Get(addr); present {
|
||||
return proxyDial(ctx, addr, grpcUA, opts)
|
||||
}
|
||||
return internal.NetDialerWithTCPKeepalive().DialContext(ctx, networkType, address)
|
||||
}
|
||||
@@ -196,10 +200,10 @@ func isTemporary(err error) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
|
||||
// NewHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
|
||||
// and starts to receive messages on it. Non-nil error returns if construction
|
||||
// fails.
|
||||
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ *http2Client, err error) {
|
||||
func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ ClientTransport, err error) {
|
||||
scheme := "http"
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer func() {
|
||||
@@ -214,7 +218,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
// address specific arbitrary data to reach custom dialers and credential handshakers.
|
||||
connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
|
||||
|
||||
conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)
|
||||
conn, err := dial(connectCtx, opts.Dialer, addr, opts.UserAgent)
|
||||
if err != nil {
|
||||
if opts.FailOnNonTempDialError {
|
||||
return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
|
||||
@@ -229,7 +233,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
}
|
||||
}(conn)
|
||||
|
||||
// The following defer and goroutine monitor the connectCtx for cancelation
|
||||
// The following defer and goroutine monitor the connectCtx for cancellation
|
||||
// and deadline. On context expiration, the connection is hard closed and
|
||||
// this function will naturally fail as a result. Otherwise, the defer
|
||||
// waits for the goroutine to exit to prevent the context from being
|
||||
@@ -332,10 +336,11 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
readerDone: make(chan struct{}),
|
||||
writerDone: make(chan struct{}),
|
||||
goAway: make(chan struct{}),
|
||||
keepaliveDone: make(chan struct{}),
|
||||
framer: newFramer(conn, writeBufSize, readBufSize, opts.SharedWriteBuffer, maxHeaderListSize),
|
||||
fc: &trInFlow{limit: uint32(icwz)},
|
||||
scheme: scheme,
|
||||
activeStreams: make(map[uint32]*Stream),
|
||||
activeStreams: make(map[uint32]*ClientStream),
|
||||
isSecure: isSecure,
|
||||
perRPCCreds: perRPCCreds,
|
||||
kp: kp,
|
||||
@@ -346,7 +351,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
streamQuota: defaultMaxStreamsClient,
|
||||
streamsQuotaAvailable: make(chan struct{}, 1),
|
||||
keepaliveEnabled: keepaliveEnabled,
|
||||
bufferPool: newBufferPool(),
|
||||
bufferPool: opts.BufferPool,
|
||||
onClose: onClose,
|
||||
}
|
||||
var czSecurity credentials.ChannelzSecurityValue
|
||||
@@ -408,10 +413,10 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
readerErrCh := make(chan error, 1)
|
||||
go t.reader(readerErrCh)
|
||||
defer func() {
|
||||
if err == nil {
|
||||
err = <-readerErrCh
|
||||
}
|
||||
if err != nil {
|
||||
// writerDone should be closed since the loopy goroutine
|
||||
// wouldn't have started in the case this function returns an error.
|
||||
close(t.writerDone)
|
||||
t.Close(err)
|
||||
}
|
||||
}()
|
||||
@@ -458,8 +463,12 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
if err := t.framer.writer.Flush(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Block until the server preface is received successfully or an error occurs.
|
||||
if err = <-readerErrCh; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go func() {
|
||||
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
|
||||
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler, t.bufferPool)
|
||||
if err := t.loopy.run(); !isIOError(err) {
|
||||
// Immediately close the connection, as the loopy writer returns
|
||||
// when there are no more active streams and we were draining (the
|
||||
@@ -472,17 +481,19 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
|
||||
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *ClientStream {
|
||||
// TODO(zhaoq): Handle uint32 overflow of Stream.id.
|
||||
s := &Stream{
|
||||
ct: t,
|
||||
done: make(chan struct{}),
|
||||
method: callHdr.Method,
|
||||
sendCompress: callHdr.SendCompress,
|
||||
buf: newRecvBuffer(),
|
||||
headerChan: make(chan struct{}),
|
||||
contentSubtype: callHdr.ContentSubtype,
|
||||
doneFunc: callHdr.DoneFunc,
|
||||
s := &ClientStream{
|
||||
Stream: &Stream{
|
||||
method: callHdr.Method,
|
||||
sendCompress: callHdr.SendCompress,
|
||||
buf: newRecvBuffer(),
|
||||
contentSubtype: callHdr.ContentSubtype,
|
||||
},
|
||||
ct: t,
|
||||
done: make(chan struct{}),
|
||||
headerChan: make(chan struct{}),
|
||||
doneFunc: callHdr.DoneFunc,
|
||||
}
|
||||
s.wq = newWriteQuota(defaultWriteQuota, s.done)
|
||||
s.requestRead = func(n int) {
|
||||
@@ -498,9 +509,8 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
|
||||
ctxDone: s.ctx.Done(),
|
||||
recv: s.buf,
|
||||
closeStream: func(err error) {
|
||||
t.CloseStream(s, err)
|
||||
s.Close(err)
|
||||
},
|
||||
freeBuffer: t.bufferPool.put,
|
||||
},
|
||||
windowHandler: func(n int) {
|
||||
t.updateWindow(s, uint32(n))
|
||||
@@ -517,6 +527,18 @@ func (t *http2Client) getPeer() *peer.Peer {
|
||||
}
|
||||
}
|
||||
|
||||
// OutgoingGoAwayHandler writes a GOAWAY to the connection. Always returns (false, err) as we want the GoAway
|
||||
// to be the last frame loopy writes to the transport.
|
||||
func (t *http2Client) outgoingGoAwayHandler(g *goAway) (bool, error) {
|
||||
t.mu.Lock()
|
||||
maxStreamID := t.nextID - 2
|
||||
t.mu.Unlock()
|
||||
if err := t.framer.fr.WriteGoAway(maxStreamID, http2.ErrCodeNo, g.debugData); err != nil {
|
||||
return false, err
|
||||
}
|
||||
return false, g.closeConn
|
||||
}
|
||||
|
||||
func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
|
||||
aud := t.createAudience(callHdr)
|
||||
ri := credentials.RequestInfo{
|
||||
@@ -578,12 +600,6 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
|
||||
for k, v := range callAuthData {
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
|
||||
}
|
||||
if b := stats.OutgoingTags(ctx); b != nil {
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
|
||||
}
|
||||
if b := stats.OutgoingTrace(ctx); b != nil {
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
|
||||
}
|
||||
|
||||
if md, added, ok := metadataFromOutgoingContextRaw(ctx); ok {
|
||||
var k string
|
||||
@@ -719,7 +735,7 @@ func (e NewStreamError) Error() string {
|
||||
|
||||
// NewStream creates a stream and registers it into the transport as "active"
|
||||
// streams. All non-nil errors returned will be *NewStreamError.
|
||||
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) {
|
||||
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientStream, error) {
|
||||
ctx = peer.NewContext(ctx, t.getPeer())
|
||||
|
||||
// ServerName field of the resolver returned address takes precedence over
|
||||
@@ -744,7 +760,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
|
||||
return
|
||||
}
|
||||
// The stream was unprocessed by the server.
|
||||
atomic.StoreUint32(&s.unprocessed, 1)
|
||||
s.unprocessed.Store(true)
|
||||
s.write(recvMsg{err: err})
|
||||
close(s.done)
|
||||
// If headerChan isn't closed, then close it.
|
||||
@@ -755,7 +771,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
|
||||
hdr := &headerFrame{
|
||||
hf: headerFields,
|
||||
endStream: false,
|
||||
initStream: func(id uint32) error {
|
||||
initStream: func(uint32) error {
|
||||
t.mu.Lock()
|
||||
// TODO: handle transport closure in loopy instead and remove this
|
||||
// initStream is never called when transport is draining.
|
||||
@@ -781,7 +797,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
|
||||
firstTry := true
|
||||
var ch chan struct{}
|
||||
transportDrainRequired := false
|
||||
checkForStreamQuota := func(it any) bool {
|
||||
checkForStreamQuota := func() bool {
|
||||
if t.streamQuota <= 0 { // Can go negative if server decreases it.
|
||||
if firstTry {
|
||||
t.waitingStreams++
|
||||
@@ -793,23 +809,24 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
|
||||
t.waitingStreams--
|
||||
}
|
||||
t.streamQuota--
|
||||
h := it.(*headerFrame)
|
||||
h.streamID = t.nextID
|
||||
t.nextID += 2
|
||||
|
||||
// Drain client transport if nextID > MaxStreamID which signals gRPC that
|
||||
// the connection is closed and a new one must be created for subsequent RPCs.
|
||||
transportDrainRequired = t.nextID > MaxStreamID
|
||||
|
||||
s.id = h.streamID
|
||||
s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
|
||||
t.mu.Lock()
|
||||
if t.state == draining || t.activeStreams == nil { // Can be niled from Close().
|
||||
t.mu.Unlock()
|
||||
return false // Don't create a stream if the transport is already closed.
|
||||
}
|
||||
|
||||
hdr.streamID = t.nextID
|
||||
t.nextID += 2
|
||||
// Drain client transport if nextID > MaxStreamID which signals gRPC that
|
||||
// the connection is closed and a new one must be created for subsequent RPCs.
|
||||
transportDrainRequired = t.nextID > MaxStreamID
|
||||
|
||||
s.id = hdr.streamID
|
||||
s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
|
||||
t.activeStreams[s.id] = s
|
||||
t.mu.Unlock()
|
||||
|
||||
if t.streamQuota > 0 && t.waitingStreams > 0 {
|
||||
select {
|
||||
case t.streamsQuotaAvailable <- struct{}{}:
|
||||
@@ -819,13 +836,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
|
||||
return true
|
||||
}
|
||||
var hdrListSizeErr error
|
||||
checkForHeaderListSize := func(it any) bool {
|
||||
checkForHeaderListSize := func() bool {
|
||||
if t.maxSendHeaderListSize == nil {
|
||||
return true
|
||||
}
|
||||
hdrFrame := it.(*headerFrame)
|
||||
var sz int64
|
||||
for _, f := range hdrFrame.hf {
|
||||
for _, f := range hdr.hf {
|
||||
if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
|
||||
hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
|
||||
return false
|
||||
@@ -834,8 +850,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
|
||||
return true
|
||||
}
|
||||
for {
|
||||
success, err := t.controlBuf.executeAndPut(func(it any) bool {
|
||||
return checkForHeaderListSize(it) && checkForStreamQuota(it)
|
||||
success, err := t.controlBuf.executeAndPut(func() bool {
|
||||
return checkForHeaderListSize() && checkForStreamQuota()
|
||||
}, hdr)
|
||||
if err != nil {
|
||||
// Connection closed.
|
||||
@@ -889,21 +905,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// CloseStream clears the footprint of a stream when the stream is not needed any more.
|
||||
// This must not be executed in reader's goroutine.
|
||||
func (t *http2Client) CloseStream(s *Stream, err error) {
|
||||
var (
|
||||
rst bool
|
||||
rstCode http2.ErrCode
|
||||
)
|
||||
if err != nil {
|
||||
rst = true
|
||||
rstCode = http2.ErrCodeCancel
|
||||
}
|
||||
t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
|
||||
}
|
||||
|
||||
func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
|
||||
func (t *http2Client) closeStream(s *ClientStream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
|
||||
// Set stream status to done.
|
||||
if s.swapState(streamDone) == streamDone {
|
||||
// If it was already done, return. If multiple closeStream calls
|
||||
@@ -946,7 +948,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
|
||||
rst: rst,
|
||||
rstCode: rstCode,
|
||||
}
|
||||
addBackStreamQuota := func(any) bool {
|
||||
addBackStreamQuota := func() bool {
|
||||
t.streamQuota++
|
||||
if t.streamQuota > 0 && t.waitingStreams > 0 {
|
||||
select {
|
||||
@@ -966,8 +968,9 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
|
||||
|
||||
// Close kicks off the shutdown process of the transport. This should be called
|
||||
// only once on a transport. Once it is called, the transport should not be
|
||||
// accessed any more.
|
||||
// accessed anymore.
|
||||
func (t *http2Client) Close(err error) {
|
||||
t.conn.SetWriteDeadline(time.Now().Add(time.Second * 10))
|
||||
t.mu.Lock()
|
||||
// Make sure we only close once.
|
||||
if t.state == closing {
|
||||
@@ -990,15 +993,33 @@ func (t *http2Client) Close(err error) {
|
||||
// should unblock it so that the goroutine eventually exits.
|
||||
t.kpDormancyCond.Signal()
|
||||
}
|
||||
t.mu.Unlock()
|
||||
t.controlBuf.finish()
|
||||
t.cancel()
|
||||
t.conn.Close()
|
||||
channelz.RemoveEntry(t.channelz.ID)
|
||||
// Append info about previous goaways if there were any, since this may be important
|
||||
// for understanding the root cause for this connection to be closed.
|
||||
_, goAwayDebugMessage := t.GetGoAwayReason()
|
||||
goAwayDebugMessage := t.goAwayDebugMessage
|
||||
t.mu.Unlock()
|
||||
|
||||
// Per HTTP/2 spec, a GOAWAY frame must be sent before closing the
|
||||
// connection. See https://httpwg.org/specs/rfc7540.html#GOAWAY. It
|
||||
// also waits for loopyWriter to be closed with a timer to avoid the
|
||||
// long blocking in case the connection is blackholed, i.e. TCP is
|
||||
// just stuck.
|
||||
t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte("client transport shutdown"), closeConn: err})
|
||||
timer := time.NewTimer(goAwayLoopyWriterTimeout)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-t.writerDone: // success
|
||||
case <-timer.C:
|
||||
t.logger.Infof("Failed to write a GOAWAY frame as part of connection close after %s. Giving up and closing the transport.", goAwayLoopyWriterTimeout)
|
||||
}
|
||||
t.cancel()
|
||||
t.conn.Close()
|
||||
// Waits for the reader and keepalive goroutines to exit before returning to
|
||||
// ensure all resources are cleaned up before Close can return.
|
||||
<-t.readerDone
|
||||
if t.keepaliveEnabled {
|
||||
<-t.keepaliveDone
|
||||
}
|
||||
channelz.RemoveEntry(t.channelz.ID)
|
||||
var st *status.Status
|
||||
if len(goAwayDebugMessage) > 0 {
|
||||
st = status.Newf(codes.Unavailable, "closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage)
|
||||
@@ -1047,30 +1068,40 @@ func (t *http2Client) GracefulClose() {
|
||||
|
||||
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
|
||||
// should proceed only if Write returns nil.
|
||||
func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
|
||||
func (t *http2Client) write(s *ClientStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error {
|
||||
reader := data.Reader()
|
||||
|
||||
if opts.Last {
|
||||
// If it's the last message, update stream state.
|
||||
if !s.compareAndSwapState(streamActive, streamWriteDone) {
|
||||
_ = reader.Close()
|
||||
return errStreamDone
|
||||
}
|
||||
} else if s.getState() != streamActive {
|
||||
_ = reader.Close()
|
||||
return errStreamDone
|
||||
}
|
||||
df := &dataFrame{
|
||||
streamID: s.id,
|
||||
endStream: opts.Last,
|
||||
h: hdr,
|
||||
d: data,
|
||||
reader: reader,
|
||||
}
|
||||
if hdr != nil || data != nil { // If it's not an empty data frame, check quota.
|
||||
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
|
||||
if hdr != nil || df.reader.Remaining() != 0 { // If it's not an empty data frame, check quota.
|
||||
if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil {
|
||||
_ = reader.Close()
|
||||
return err
|
||||
}
|
||||
}
|
||||
return t.controlBuf.put(df)
|
||||
if err := t.controlBuf.put(df); err != nil {
|
||||
_ = reader.Close()
|
||||
return err
|
||||
}
|
||||
t.incrMsgSent()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *http2Client) getStream(f http2.Frame) *Stream {
|
||||
func (t *http2Client) getStream(f http2.Frame) *ClientStream {
|
||||
t.mu.Lock()
|
||||
s := t.activeStreams[f.Header().StreamID]
|
||||
t.mu.Unlock()
|
||||
@@ -1080,7 +1111,7 @@ func (t *http2Client) getStream(f http2.Frame) *Stream {
|
||||
// adjustWindow sends out extra window update over the initial window size
|
||||
// of stream if the application is requesting data larger in size than
|
||||
// the window.
|
||||
func (t *http2Client) adjustWindow(s *Stream, n uint32) {
|
||||
func (t *http2Client) adjustWindow(s *ClientStream, n uint32) {
|
||||
if w := s.fc.maybeAdjust(n); w > 0 {
|
||||
t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
|
||||
}
|
||||
@@ -1089,7 +1120,7 @@ func (t *http2Client) adjustWindow(s *Stream, n uint32) {
|
||||
// updateWindow adjusts the inbound quota for the stream.
|
||||
// Window updates will be sent out when the cumulative quota
|
||||
// exceeds the corresponding threshold.
|
||||
func (t *http2Client) updateWindow(s *Stream, n uint32) {
|
||||
func (t *http2Client) updateWindow(s *ClientStream, n uint32) {
|
||||
if w := s.fc.onRead(n); w > 0 {
|
||||
t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
|
||||
}
|
||||
@@ -1099,7 +1130,7 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
|
||||
// for the transport and the stream based on the current bdp
|
||||
// estimation.
|
||||
func (t *http2Client) updateFlowControl(n uint32) {
|
||||
updateIWS := func(any) bool {
|
||||
updateIWS := func() bool {
|
||||
t.initialWindowSize = int32(n)
|
||||
t.mu.Lock()
|
||||
for _, s := range t.activeStreams {
|
||||
@@ -1172,10 +1203,13 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
|
||||
// guarantee f.Data() is consumed before the arrival of next frame.
|
||||
// Can this copy be eliminated?
|
||||
if len(f.Data()) > 0 {
|
||||
buffer := t.bufferPool.get()
|
||||
buffer.Reset()
|
||||
buffer.Write(f.Data())
|
||||
s.write(recvMsg{buffer: buffer})
|
||||
pool := t.bufferPool
|
||||
if pool == nil {
|
||||
// Note that this is only supposed to be nil in tests. Otherwise, stream is
|
||||
// always initialized with a BufferPool.
|
||||
pool = mem.DefaultBufferPool()
|
||||
}
|
||||
s.write(recvMsg{buffer: mem.Copy(f.Data(), pool)})
|
||||
}
|
||||
}
|
||||
// The server has closed the stream without sending trailers. Record that
|
||||
@@ -1192,7 +1226,7 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
|
||||
}
|
||||
if f.ErrCode == http2.ErrCodeRefusedStream {
|
||||
// The stream was unprocessed by the server.
|
||||
atomic.StoreUint32(&s.unprocessed, 1)
|
||||
s.unprocessed.Store(true)
|
||||
}
|
||||
statusCode, ok := http2ErrConvTab[f.ErrCode]
|
||||
if !ok {
|
||||
@@ -1204,11 +1238,12 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
|
||||
if statusCode == codes.Canceled {
|
||||
if d, ok := s.ctx.Deadline(); ok && !d.After(time.Now()) {
|
||||
// Our deadline was already exceeded, and that was likely the cause
|
||||
// of this cancelation. Alter the status code accordingly.
|
||||
// of this cancellation. Alter the status code accordingly.
|
||||
statusCode = codes.DeadlineExceeded
|
||||
}
|
||||
}
|
||||
t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
|
||||
st := status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode)
|
||||
t.closeStream(s, st.Err(), false, http2.ErrCodeNo, st, nil, false)
|
||||
}
|
||||
|
||||
func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
|
||||
@@ -1252,7 +1287,7 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
|
||||
}
|
||||
updateFuncs = append(updateFuncs, updateStreamQuota)
|
||||
}
|
||||
t.controlBuf.executeAndPut(func(any) bool {
|
||||
t.controlBuf.executeAndPut(func() bool {
|
||||
for _, f := range updateFuncs {
|
||||
f()
|
||||
}
|
||||
@@ -1273,11 +1308,11 @@ func (t *http2Client) handlePing(f *http2.PingFrame) {
|
||||
t.controlBuf.put(pingAck)
|
||||
}
|
||||
|
||||
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
||||
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) error {
|
||||
t.mu.Lock()
|
||||
if t.state == closing {
|
||||
t.mu.Unlock()
|
||||
return
|
||||
return nil
|
||||
}
|
||||
if f.ErrCode == http2.ErrCodeEnhanceYourCalm && string(f.DebugData()) == "too_many_pings" {
|
||||
// When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
|
||||
@@ -1289,8 +1324,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
||||
id := f.LastStreamID
|
||||
if id > 0 && id%2 == 0 {
|
||||
t.mu.Unlock()
|
||||
t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", id))
|
||||
return
|
||||
return connectionErrorf(true, nil, "received goaway with non-zero even-numbered stream id: %v", id)
|
||||
}
|
||||
// A client can receive multiple GoAways from the server (see
|
||||
// https://github.com/grpc/grpc-go/issues/1387). The idea is that the first
|
||||
@@ -1307,8 +1341,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
||||
// If there are multiple GoAways the first one should always have an ID greater than the following ones.
|
||||
if id > t.prevGoAwayID {
|
||||
t.mu.Unlock()
|
||||
t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID))
|
||||
return
|
||||
return connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID)
|
||||
}
|
||||
default:
|
||||
t.setGoAwayReason(f)
|
||||
@@ -1332,15 +1365,14 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
||||
t.prevGoAwayID = id
|
||||
if len(t.activeStreams) == 0 {
|
||||
t.mu.Unlock()
|
||||
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
|
||||
return
|
||||
return connectionErrorf(true, nil, "received goaway and there are no active streams")
|
||||
}
|
||||
|
||||
streamsToClose := make([]*Stream, 0)
|
||||
streamsToClose := make([]*ClientStream, 0)
|
||||
for streamID, stream := range t.activeStreams {
|
||||
if streamID > id && streamID <= upperLimit {
|
||||
// The stream was unprocessed by the server.
|
||||
atomic.StoreUint32(&stream.unprocessed, 1)
|
||||
stream.unprocessed.Store(true)
|
||||
streamsToClose = append(streamsToClose, stream)
|
||||
}
|
||||
}
|
||||
@@ -1350,6 +1382,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
||||
for _, stream := range streamsToClose {
|
||||
t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// setGoAwayReason sets the value of t.goAwayReason based
|
||||
@@ -1358,8 +1391,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
||||
// the caller.
|
||||
func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
|
||||
t.goAwayReason = GoAwayNoReason
|
||||
switch f.ErrCode {
|
||||
case http2.ErrCodeEnhanceYourCalm:
|
||||
if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
|
||||
if string(f.DebugData()) == "too_many_pings" {
|
||||
t.goAwayReason = GoAwayTooManyPings
|
||||
}
|
||||
@@ -1391,7 +1423,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
|
||||
return
|
||||
}
|
||||
endStream := frame.StreamEnded()
|
||||
atomic.StoreUint32(&s.bytesReceived, 1)
|
||||
s.bytesReceived.Store(true)
|
||||
initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
|
||||
|
||||
if !initialHeader && !endStream {
|
||||
@@ -1585,7 +1617,13 @@ func (t *http2Client) readServerPreface() error {
|
||||
// network connection. If the server preface is not read successfully, an
|
||||
// error is pushed to errCh; otherwise errCh is closed with no error.
|
||||
func (t *http2Client) reader(errCh chan<- error) {
|
||||
defer close(t.readerDone)
|
||||
var errClose error
|
||||
defer func() {
|
||||
close(t.readerDone)
|
||||
if errClose != nil {
|
||||
t.Close(errClose)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := t.readServerPreface(); err != nil {
|
||||
errCh <- err
|
||||
@@ -1624,11 +1662,10 @@ func (t *http2Client) reader(errCh chan<- error) {
|
||||
t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
|
||||
}
|
||||
continue
|
||||
} else {
|
||||
// Transport error.
|
||||
t.Close(connectionErrorf(true, err, "error reading from server: %v", err))
|
||||
return
|
||||
}
|
||||
// Transport error.
|
||||
errClose = connectionErrorf(true, err, "error reading from server: %v", err)
|
||||
return
|
||||
}
|
||||
switch frame := frame.(type) {
|
||||
case *http2.MetaHeadersFrame:
|
||||
@@ -1642,7 +1679,7 @@ func (t *http2Client) reader(errCh chan<- error) {
|
||||
case *http2.PingFrame:
|
||||
t.handlePing(frame)
|
||||
case *http2.GoAwayFrame:
|
||||
t.handleGoAway(frame)
|
||||
errClose = t.handleGoAway(frame)
|
||||
case *http2.WindowUpdateFrame:
|
||||
t.handleWindowUpdate(frame)
|
||||
default:
|
||||
@@ -1653,15 +1690,15 @@ func (t *http2Client) reader(errCh chan<- error) {
|
||||
}
|
||||
}
|
||||
|
||||
func minTime(a, b time.Duration) time.Duration {
|
||||
if a < b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// keepalive running in a separate goroutine makes sure the connection is alive by sending pings.
|
||||
func (t *http2Client) keepalive() {
|
||||
var err error
|
||||
defer func() {
|
||||
close(t.keepaliveDone)
|
||||
if err != nil {
|
||||
t.Close(err)
|
||||
}
|
||||
}()
|
||||
p := &ping{data: [8]byte{}}
|
||||
// True iff a ping has been sent, and no data has been received since then.
|
||||
outstandingPing := false
|
||||
@@ -1685,7 +1722,7 @@ func (t *http2Client) keepalive() {
|
||||
continue
|
||||
}
|
||||
if outstandingPing && timeoutLeft <= 0 {
|
||||
t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout"))
|
||||
err = connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout")
|
||||
return
|
||||
}
|
||||
t.mu.Lock()
|
||||
@@ -1727,7 +1764,7 @@ func (t *http2Client) keepalive() {
|
||||
// timeoutLeft. This will ensure that we wait only for kp.Time
|
||||
// before sending out the next ping (for cases where the ping is
|
||||
// acked).
|
||||
sleepDuration := minTime(t.kp.Time, timeoutLeft)
|
||||
sleepDuration := min(t.kp.Time, timeoutLeft)
|
||||
timeoutLeft -= sleepDuration
|
||||
timer.Reset(sleepDuration)
|
||||
case <-t.ctx.Done():
|
||||
@@ -1756,14 +1793,18 @@ func (t *http2Client) socketMetrics() *channelz.EphemeralSocketMetrics {
|
||||
|
||||
func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
|
||||
|
||||
func (t *http2Client) IncrMsgSent() {
|
||||
t.channelz.SocketMetrics.MessagesSent.Add(1)
|
||||
t.channelz.SocketMetrics.LastMessageSentTimestamp.Store(time.Now().UnixNano())
|
||||
func (t *http2Client) incrMsgSent() {
|
||||
if channelz.IsOn() {
|
||||
t.channelz.SocketMetrics.MessagesSent.Add(1)
|
||||
t.channelz.SocketMetrics.LastMessageSentTimestamp.Store(time.Now().UnixNano())
|
||||
}
|
||||
}
|
||||
|
||||
func (t *http2Client) IncrMsgRecv() {
|
||||
t.channelz.SocketMetrics.MessagesReceived.Add(1)
|
||||
t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Store(time.Now().UnixNano())
|
||||
func (t *http2Client) incrMsgRecv() {
|
||||
if channelz.IsOn() {
|
||||
t.channelz.SocketMetrics.MessagesReceived.Add(1)
|
||||
t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Store(time.Now().UnixNano())
|
||||
}
|
||||
}
|
||||
|
||||
func (t *http2Client) getOutFlowWindow() int64 {
|
||||
|
153
vendor/google.golang.org/grpc/internal/transport/http2_server.go
generated
vendored
153
vendor/google.golang.org/grpc/internal/transport/http2_server.go
generated
vendored
@@ -25,6 +25,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
rand "math/rand/v2"
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
@@ -34,16 +35,17 @@ import (
|
||||
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/http2/hpack"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/internal/grpclog"
|
||||
"google.golang.org/grpc/internal/grpcutil"
|
||||
"google.golang.org/grpc/internal/pretty"
|
||||
"google.golang.org/grpc/internal/syscall"
|
||||
"google.golang.org/grpc/mem"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/internal/grpcrand"
|
||||
"google.golang.org/grpc/internal/grpcsync"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
@@ -110,7 +112,7 @@ type http2Server struct {
|
||||
// already initialized since draining is already underway.
|
||||
drainEvent *grpcsync.Event
|
||||
state transportState
|
||||
activeStreams map[uint32]*Stream
|
||||
activeStreams map[uint32]*ServerStream
|
||||
// idle is the time instant when the connection went idle.
|
||||
// This is either the beginning of the connection or when the number of
|
||||
// RPCs go down to 0.
|
||||
@@ -119,7 +121,7 @@ type http2Server struct {
|
||||
|
||||
// Fields below are for channelz metric collection.
|
||||
channelz *channelz.Socket
|
||||
bufferPool *bufferPool
|
||||
bufferPool mem.BufferPool
|
||||
|
||||
connectionID uint64
|
||||
|
||||
@@ -255,13 +257,13 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
|
||||
inTapHandle: config.InTapHandle,
|
||||
fc: &trInFlow{limit: uint32(icwz)},
|
||||
state: reachable,
|
||||
activeStreams: make(map[uint32]*Stream),
|
||||
activeStreams: make(map[uint32]*ServerStream),
|
||||
stats: config.StatsHandlers,
|
||||
kp: kp,
|
||||
idle: time.Now(),
|
||||
kep: kep,
|
||||
initialWindowSize: iwz,
|
||||
bufferPool: newBufferPool(),
|
||||
bufferPool: config.BufferPool,
|
||||
}
|
||||
var czSecurity credentials.ChannelzSecurityValue
|
||||
if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok {
|
||||
@@ -330,8 +332,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
|
||||
t.handleSettings(sf)
|
||||
|
||||
go func() {
|
||||
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
|
||||
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
|
||||
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler, t.bufferPool)
|
||||
err := t.loopy.run()
|
||||
close(t.loopyWriterDone)
|
||||
if !isIOError(err) {
|
||||
@@ -359,7 +360,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
|
||||
|
||||
// operateHeaders takes action on the decoded headers. Returns an error if fatal
|
||||
// error encountered and transport needs to close, otherwise returns nil.
|
||||
func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*Stream)) error {
|
||||
func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*ServerStream)) error {
|
||||
// Acquire max stream ID lock for entire duration
|
||||
t.maxStreamMu.Lock()
|
||||
defer t.maxStreamMu.Unlock()
|
||||
@@ -385,11 +386,13 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
|
||||
t.maxStreamID = streamID
|
||||
|
||||
buf := newRecvBuffer()
|
||||
s := &Stream{
|
||||
id: streamID,
|
||||
s := &ServerStream{
|
||||
Stream: &Stream{
|
||||
id: streamID,
|
||||
buf: buf,
|
||||
fc: &inFlow{limit: uint32(t.initialWindowSize)},
|
||||
},
|
||||
st: t,
|
||||
buf: buf,
|
||||
fc: &inFlow{limit: uint32(t.initialWindowSize)},
|
||||
headerWireLength: int(frame.Header().Length),
|
||||
}
|
||||
var (
|
||||
@@ -537,12 +540,6 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
|
||||
// Attach the received metadata to the context.
|
||||
if len(mdata) > 0 {
|
||||
s.ctx = metadata.NewIncomingContext(s.ctx, mdata)
|
||||
if statsTags := mdata["grpc-tags-bin"]; len(statsTags) > 0 {
|
||||
s.ctx = stats.SetIncomingTags(s.ctx, []byte(statsTags[len(statsTags)-1]))
|
||||
}
|
||||
if statsTrace := mdata["grpc-trace-bin"]; len(statsTrace) > 0 {
|
||||
s.ctx = stats.SetIncomingTrace(s.ctx, []byte(statsTrace[len(statsTrace)-1]))
|
||||
}
|
||||
}
|
||||
t.mu.Lock()
|
||||
if t.state != reachable {
|
||||
@@ -568,7 +565,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
|
||||
t.logger.Infof("Aborting the stream early: %v", errMsg)
|
||||
}
|
||||
t.controlBuf.put(&earlyAbortStream{
|
||||
httpStatus: 405,
|
||||
httpStatus: http.StatusMethodNotAllowed,
|
||||
streamID: streamID,
|
||||
contentSubtype: s.contentSubtype,
|
||||
status: status.New(codes.Internal, errMsg),
|
||||
@@ -589,7 +586,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
|
||||
stat = status.New(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
t.controlBuf.put(&earlyAbortStream{
|
||||
httpStatus: 200,
|
||||
httpStatus: http.StatusOK,
|
||||
streamID: s.id,
|
||||
contentSubtype: s.contentSubtype,
|
||||
status: stat,
|
||||
@@ -602,6 +599,22 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
|
||||
if len(t.activeStreams) == 1 {
|
||||
t.idle = time.Time{}
|
||||
}
|
||||
// Start a timer to close the stream on reaching the deadline.
|
||||
if timeoutSet {
|
||||
// We need to wait for s.cancel to be updated before calling
|
||||
// t.closeStream to avoid data races.
|
||||
cancelUpdated := make(chan struct{})
|
||||
timer := internal.TimeAfterFunc(timeout, func() {
|
||||
<-cancelUpdated
|
||||
t.closeStream(s, true, http2.ErrCodeCancel, false)
|
||||
})
|
||||
oldCancel := s.cancel
|
||||
s.cancel = func() {
|
||||
oldCancel()
|
||||
timer.Stop()
|
||||
}
|
||||
close(cancelUpdated)
|
||||
}
|
||||
t.mu.Unlock()
|
||||
if channelz.IsOn() {
|
||||
t.channelz.SocketMetrics.StreamsStarted.Add(1)
|
||||
@@ -614,10 +627,9 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
|
||||
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
|
||||
s.trReader = &transportReader{
|
||||
reader: &recvBufferReader{
|
||||
ctx: s.ctx,
|
||||
ctxDone: s.ctxDone,
|
||||
recv: s.buf,
|
||||
freeBuffer: t.bufferPool.put,
|
||||
ctx: s.ctx,
|
||||
ctxDone: s.ctxDone,
|
||||
recv: s.buf,
|
||||
},
|
||||
windowHandler: func(n int) {
|
||||
t.updateWindow(s, uint32(n))
|
||||
@@ -635,7 +647,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
|
||||
// HandleStreams receives incoming streams using the given handler. This is
|
||||
// typically run in a separate goroutine.
|
||||
// traceCtx attaches trace to ctx and returns the new context.
|
||||
func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {
|
||||
func (t *http2Server) HandleStreams(ctx context.Context, handle func(*ServerStream)) {
|
||||
defer func() {
|
||||
close(t.readerDone)
|
||||
<-t.loopyWriterDone
|
||||
@@ -699,7 +711,7 @@ func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {
|
||||
}
|
||||
}
|
||||
|
||||
func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
|
||||
func (t *http2Server) getStream(f http2.Frame) (*ServerStream, bool) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
if t.activeStreams == nil {
|
||||
@@ -717,7 +729,7 @@ func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
|
||||
// adjustWindow sends out extra window update over the initial window size
|
||||
// of stream if the application is requesting data larger in size than
|
||||
// the window.
|
||||
func (t *http2Server) adjustWindow(s *Stream, n uint32) {
|
||||
func (t *http2Server) adjustWindow(s *ServerStream, n uint32) {
|
||||
if w := s.fc.maybeAdjust(n); w > 0 {
|
||||
t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
|
||||
}
|
||||
@@ -727,7 +739,7 @@ func (t *http2Server) adjustWindow(s *Stream, n uint32) {
|
||||
// updateWindow adjusts the inbound quota for the stream and the transport.
|
||||
// Window updates will deliver to the controller for sending when
|
||||
// the cumulative quota exceeds the corresponding threshold.
|
||||
func (t *http2Server) updateWindow(s *Stream, n uint32) {
|
||||
func (t *http2Server) updateWindow(s *ServerStream, n uint32) {
|
||||
if w := s.fc.onRead(n); w > 0 {
|
||||
t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
|
||||
increment: w,
|
||||
@@ -814,10 +826,13 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
|
||||
// guarantee f.Data() is consumed before the arrival of next frame.
|
||||
// Can this copy be eliminated?
|
||||
if len(f.Data()) > 0 {
|
||||
buffer := t.bufferPool.get()
|
||||
buffer.Reset()
|
||||
buffer.Write(f.Data())
|
||||
s.write(recvMsg{buffer: buffer})
|
||||
pool := t.bufferPool
|
||||
if pool == nil {
|
||||
// Note that this is only supposed to be nil in tests. Otherwise, stream is
|
||||
// always initialized with a BufferPool.
|
||||
pool = mem.DefaultBufferPool()
|
||||
}
|
||||
s.write(recvMsg{buffer: mem.Copy(f.Data(), pool)})
|
||||
}
|
||||
}
|
||||
if f.StreamEnded() {
|
||||
@@ -860,7 +875,7 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
|
||||
}
|
||||
return nil
|
||||
})
|
||||
t.controlBuf.executeAndPut(func(any) bool {
|
||||
t.controlBuf.executeAndPut(func() bool {
|
||||
for _, f := range updateFuncs {
|
||||
f()
|
||||
}
|
||||
@@ -961,7 +976,7 @@ func (t *http2Server) checkForHeaderListSize(it any) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (t *http2Server) streamContextErr(s *Stream) error {
|
||||
func (t *http2Server) streamContextErr(s *ServerStream) error {
|
||||
select {
|
||||
case <-t.done:
|
||||
return ErrConnClosing
|
||||
@@ -971,7 +986,7 @@ func (t *http2Server) streamContextErr(s *Stream) error {
|
||||
}
|
||||
|
||||
// WriteHeader sends the header metadata md back to the client.
|
||||
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
func (t *http2Server) writeHeader(s *ServerStream, md metadata.MD) error {
|
||||
s.hdrMu.Lock()
|
||||
defer s.hdrMu.Unlock()
|
||||
if s.getState() == streamDone {
|
||||
@@ -1004,7 +1019,7 @@ func (t *http2Server) setResetPingStrikes() {
|
||||
atomic.StoreUint32(&t.resetPingStrikes, 1)
|
||||
}
|
||||
|
||||
func (t *http2Server) writeHeaderLocked(s *Stream) error {
|
||||
func (t *http2Server) writeHeaderLocked(s *ServerStream) error {
|
||||
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
|
||||
// first and create a slice of that exact size.
|
||||
headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
|
||||
@@ -1014,12 +1029,13 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
|
||||
}
|
||||
headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
|
||||
success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
|
||||
hf := &headerFrame{
|
||||
streamID: s.id,
|
||||
hf: headerFields,
|
||||
endStream: false,
|
||||
onWrite: t.setResetPingStrikes,
|
||||
})
|
||||
}
|
||||
success, err := t.controlBuf.executeAndPut(func() bool { return t.checkForHeaderListSize(hf) }, hf)
|
||||
if !success {
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -1043,7 +1059,7 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {
|
||||
// There is no further I/O operations being able to perform on this stream.
|
||||
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
|
||||
// OK is adopted.
|
||||
func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
||||
func (t *http2Server) writeStatus(s *ServerStream, st *status.Status) error {
|
||||
s.hdrMu.Lock()
|
||||
defer s.hdrMu.Unlock()
|
||||
|
||||
@@ -1089,7 +1105,9 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
||||
onWrite: t.setResetPingStrikes,
|
||||
}
|
||||
|
||||
success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
|
||||
success, err := t.controlBuf.executeAndPut(func() bool {
|
||||
return t.checkForHeaderListSize(trailingHeader)
|
||||
}, nil)
|
||||
if !success {
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -1112,27 +1130,38 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
||||
|
||||
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
|
||||
// is returns if it fails (e.g., framing error, transport error).
|
||||
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
|
||||
func (t *http2Server) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ *WriteOptions) error {
|
||||
reader := data.Reader()
|
||||
|
||||
if !s.isHeaderSent() { // Headers haven't been written yet.
|
||||
if err := t.WriteHeader(s, nil); err != nil {
|
||||
if err := t.writeHeader(s, nil); err != nil {
|
||||
_ = reader.Close()
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// Writing headers checks for this condition.
|
||||
if s.getState() == streamDone {
|
||||
_ = reader.Close()
|
||||
return t.streamContextErr(s)
|
||||
}
|
||||
}
|
||||
|
||||
df := &dataFrame{
|
||||
streamID: s.id,
|
||||
h: hdr,
|
||||
d: data,
|
||||
reader: reader,
|
||||
onEachWrite: t.setResetPingStrikes,
|
||||
}
|
||||
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
|
||||
if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil {
|
||||
_ = reader.Close()
|
||||
return t.streamContextErr(s)
|
||||
}
|
||||
return t.controlBuf.put(df)
|
||||
if err := t.controlBuf.put(df); err != nil {
|
||||
_ = reader.Close()
|
||||
return err
|
||||
}
|
||||
t.incrMsgSent()
|
||||
return nil
|
||||
}
|
||||
|
||||
// keepalive running in a separate goroutine does the following:
|
||||
@@ -1208,7 +1237,7 @@ func (t *http2Server) keepalive() {
|
||||
continue
|
||||
}
|
||||
if outstandingPing && kpTimeoutLeft <= 0 {
|
||||
t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Time))
|
||||
t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Timeout))
|
||||
return
|
||||
}
|
||||
if !outstandingPing {
|
||||
@@ -1223,7 +1252,7 @@ func (t *http2Server) keepalive() {
|
||||
// timeoutLeft. This will ensure that we wait only for kp.Time
|
||||
// before sending out the next ping (for cases where the ping is
|
||||
// acked).
|
||||
sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
|
||||
sleepDuration := min(t.kp.Time, kpTimeoutLeft)
|
||||
kpTimeoutLeft -= sleepDuration
|
||||
kpTimer.Reset(sleepDuration)
|
||||
case <-t.done:
|
||||
@@ -1261,8 +1290,7 @@ func (t *http2Server) Close(err error) {
|
||||
}
|
||||
|
||||
// deleteStream deletes the stream s from transport's active streams.
|
||||
func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
|
||||
|
||||
func (t *http2Server) deleteStream(s *ServerStream, eosReceived bool) {
|
||||
t.mu.Lock()
|
||||
if _, ok := t.activeStreams[s.id]; ok {
|
||||
delete(t.activeStreams, s.id)
|
||||
@@ -1282,7 +1310,7 @@ func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
|
||||
}
|
||||
|
||||
// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
|
||||
func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
|
||||
func (t *http2Server) finishStream(s *ServerStream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
|
||||
// In case stream sending and receiving are invoked in separate
|
||||
// goroutines (e.g., bi-directional streaming), cancel needs to be
|
||||
// called to interrupt the potential blocking on other goroutines.
|
||||
@@ -1306,13 +1334,16 @@ func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, h
|
||||
}
|
||||
|
||||
// closeStream clears the footprint of a stream when the stream is not needed any more.
|
||||
func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
|
||||
func (t *http2Server) closeStream(s *ServerStream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
|
||||
// In case stream sending and receiving are invoked in separate
|
||||
// goroutines (e.g., bi-directional streaming), cancel needs to be
|
||||
// called to interrupt the potential blocking on other goroutines.
|
||||
s.cancel()
|
||||
|
||||
s.swapState(streamDone)
|
||||
oldState := s.swapState(streamDone)
|
||||
if oldState == streamDone {
|
||||
return
|
||||
}
|
||||
t.deleteStream(s, eosReceived)
|
||||
|
||||
t.controlBuf.put(&cleanupStream{
|
||||
@@ -1400,14 +1431,18 @@ func (t *http2Server) socketMetrics() *channelz.EphemeralSocketMetrics {
|
||||
}
|
||||
}
|
||||
|
||||
func (t *http2Server) IncrMsgSent() {
|
||||
t.channelz.SocketMetrics.MessagesSent.Add(1)
|
||||
t.channelz.SocketMetrics.LastMessageSentTimestamp.Add(1)
|
||||
func (t *http2Server) incrMsgSent() {
|
||||
if channelz.IsOn() {
|
||||
t.channelz.SocketMetrics.MessagesSent.Add(1)
|
||||
t.channelz.SocketMetrics.LastMessageSentTimestamp.Add(1)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *http2Server) IncrMsgRecv() {
|
||||
t.channelz.SocketMetrics.MessagesReceived.Add(1)
|
||||
t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Add(1)
|
||||
func (t *http2Server) incrMsgRecv() {
|
||||
if channelz.IsOn() {
|
||||
t.channelz.SocketMetrics.MessagesReceived.Add(1)
|
||||
t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Add(1)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *http2Server) getOutFlowWindow() int64 {
|
||||
@@ -1440,7 +1475,7 @@ func getJitter(v time.Duration) time.Duration {
|
||||
}
|
||||
// Generate a jitter between +/- 10% of the value.
|
||||
r := int64(v / 10)
|
||||
j := grpcrand.Int63n(2*r) - r
|
||||
j := rand.Int64N(2*r) - r
|
||||
return time.Duration(j)
|
||||
}
|
||||
|
||||
|
28
vendor/google.golang.org/grpc/internal/transport/http_util.go
generated
vendored
28
vendor/google.golang.org/grpc/internal/transport/http_util.go
generated
vendored
@@ -317,28 +317,32 @@ func newBufWriter(conn net.Conn, batchSize int, pool *sync.Pool) *bufWriter {
|
||||
return w
|
||||
}
|
||||
|
||||
func (w *bufWriter) Write(b []byte) (n int, err error) {
|
||||
func (w *bufWriter) Write(b []byte) (int, error) {
|
||||
if w.err != nil {
|
||||
return 0, w.err
|
||||
}
|
||||
if w.batchSize == 0 { // Buffer has been disabled.
|
||||
n, err = w.conn.Write(b)
|
||||
n, err := w.conn.Write(b)
|
||||
return n, toIOError(err)
|
||||
}
|
||||
if w.buf == nil {
|
||||
b := w.pool.Get().(*[]byte)
|
||||
w.buf = *b
|
||||
}
|
||||
written := 0
|
||||
for len(b) > 0 {
|
||||
nn := copy(w.buf[w.offset:], b)
|
||||
b = b[nn:]
|
||||
w.offset += nn
|
||||
n += nn
|
||||
if w.offset >= w.batchSize {
|
||||
err = w.flushKeepBuffer()
|
||||
copied := copy(w.buf[w.offset:], b)
|
||||
b = b[copied:]
|
||||
written += copied
|
||||
w.offset += copied
|
||||
if w.offset < w.batchSize {
|
||||
continue
|
||||
}
|
||||
if err := w.flushKeepBuffer(); err != nil {
|
||||
return written, err
|
||||
}
|
||||
}
|
||||
return n, err
|
||||
return written, nil
|
||||
}
|
||||
|
||||
func (w *bufWriter) Flush() error {
|
||||
@@ -389,7 +393,7 @@ type framer struct {
|
||||
fr *http2.Framer
|
||||
}
|
||||
|
||||
var writeBufferPoolMap map[int]*sync.Pool = make(map[int]*sync.Pool)
|
||||
var writeBufferPoolMap = make(map[int]*sync.Pool)
|
||||
var writeBufferMutex sync.Mutex
|
||||
|
||||
func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBuffer bool, maxHeaderListSize uint32) *framer {
|
||||
@@ -435,8 +439,8 @@ func getWriteBufferPool(size int) *sync.Pool {
|
||||
return pool
|
||||
}
|
||||
|
||||
// parseDialTarget returns the network and address to pass to dialer.
|
||||
func parseDialTarget(target string) (string, string) {
|
||||
// ParseDialTarget returns the network and address to pass to dialer.
|
||||
func ParseDialTarget(target string) (string, string) {
|
||||
net := "tcp"
|
||||
m1 := strings.Index(target, ":")
|
||||
m2 := strings.Index(target, ":/")
|
||||
|
72
vendor/google.golang.org/grpc/internal/transport/proxy.go
generated
vendored
72
vendor/google.golang.org/grpc/internal/transport/proxy.go
generated
vendored
@@ -30,34 +30,16 @@ import (
|
||||
"net/url"
|
||||
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/internal/proxyattributes"
|
||||
"google.golang.org/grpc/resolver"
|
||||
)
|
||||
|
||||
const proxyAuthHeaderKey = "Proxy-Authorization"
|
||||
|
||||
var (
|
||||
// The following variable will be overwritten in the tests.
|
||||
httpProxyFromEnvironment = http.ProxyFromEnvironment
|
||||
)
|
||||
|
||||
func mapAddress(address string) (*url.URL, error) {
|
||||
req := &http.Request{
|
||||
URL: &url.URL{
|
||||
Scheme: "https",
|
||||
Host: address,
|
||||
},
|
||||
}
|
||||
url, err := httpProxyFromEnvironment(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return url, nil
|
||||
}
|
||||
|
||||
// To read a response from a net.Conn, http.ReadResponse() takes a bufio.Reader.
|
||||
// It's possible that this reader reads more than what's need for the response and stores
|
||||
// those bytes in the buffer.
|
||||
// bufConn wraps the original net.Conn and the bufio.Reader to make sure we don't lose the
|
||||
// bytes in the buffer.
|
||||
// It's possible that this reader reads more than what's need for the response
|
||||
// and stores those bytes in the buffer. bufConn wraps the original net.Conn
|
||||
// and the bufio.Reader to make sure we don't lose the bytes in the buffer.
|
||||
type bufConn struct {
|
||||
net.Conn
|
||||
r io.Reader
|
||||
@@ -72,7 +54,7 @@ func basicAuth(username, password string) string {
|
||||
return base64.StdEncoding.EncodeToString([]byte(auth))
|
||||
}
|
||||
|
||||
func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr string, proxyURL *url.URL, grpcUA string) (_ net.Conn, err error) {
|
||||
func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, grpcUA string, opts proxyattributes.Options) (_ net.Conn, err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
@@ -81,15 +63,14 @@ func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr stri
|
||||
|
||||
req := &http.Request{
|
||||
Method: http.MethodConnect,
|
||||
URL: &url.URL{Host: backendAddr},
|
||||
URL: &url.URL{Host: opts.ConnectAddr},
|
||||
Header: map[string][]string{"User-Agent": {grpcUA}},
|
||||
}
|
||||
if t := proxyURL.User; t != nil {
|
||||
u := t.Username()
|
||||
p, _ := t.Password()
|
||||
if user := opts.User; user != nil {
|
||||
u := user.Username()
|
||||
p, _ := user.Password()
|
||||
req.Header.Add(proxyAuthHeaderKey, "Basic "+basicAuth(u, p))
|
||||
}
|
||||
|
||||
if err := sendHTTPRequest(ctx, req, conn); err != nil {
|
||||
return nil, fmt.Errorf("failed to write the HTTP request: %v", err)
|
||||
}
|
||||
@@ -107,32 +88,23 @@ func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr stri
|
||||
}
|
||||
return nil, fmt.Errorf("failed to do connect handshake, response: %q", dump)
|
||||
}
|
||||
|
||||
return &bufConn{Conn: conn, r: r}, nil
|
||||
// The buffer could contain extra bytes from the target server, so we can't
|
||||
// discard it. However, in many cases where the server waits for the client
|
||||
// to send the first message (e.g. when TLS is being used), the buffer will
|
||||
// be empty, so we can avoid the overhead of reading through this buffer.
|
||||
if r.Buffered() != 0 {
|
||||
return &bufConn{Conn: conn, r: r}, nil
|
||||
}
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
// proxyDial dials, connecting to a proxy first if necessary. Checks if a proxy
|
||||
// is necessary, dials, does the HTTP CONNECT handshake, and returns the
|
||||
// connection.
|
||||
func proxyDial(ctx context.Context, addr string, grpcUA string) (net.Conn, error) {
|
||||
newAddr := addr
|
||||
proxyURL, err := mapAddress(addr)
|
||||
// proxyDial establishes a TCP connection to the specified address and performs an HTTP CONNECT handshake.
|
||||
func proxyDial(ctx context.Context, addr resolver.Address, grpcUA string, opts proxyattributes.Options) (net.Conn, error) {
|
||||
conn, err := internal.NetDialerWithTCPKeepalive().DialContext(ctx, "tcp", addr.Addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if proxyURL != nil {
|
||||
newAddr = proxyURL.Host
|
||||
}
|
||||
|
||||
conn, err := internal.NetDialerWithTCPKeepalive().DialContext(ctx, "tcp", newAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if proxyURL == nil {
|
||||
// proxy is disabled if proxyURL is nil.
|
||||
return conn, err
|
||||
}
|
||||
return doHTTPConnectHandshake(ctx, conn, addr, proxyURL, grpcUA)
|
||||
return doHTTPConnectHandshake(ctx, conn, grpcUA, opts)
|
||||
}
|
||||
|
||||
func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) error {
|
||||
|
180
vendor/google.golang.org/grpc/internal/transport/server_stream.go
generated
vendored
Normal file
180
vendor/google.golang.org/grpc/internal/transport/server_stream.go
generated
vendored
Normal file
@@ -0,0 +1,180 @@
|
||||
/*
|
||||
*
|
||||
* Copyright 2024 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package transport
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"google.golang.org/grpc/mem"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// ServerStream implements streaming functionality for a gRPC server.
|
||||
type ServerStream struct {
|
||||
*Stream // Embed for common stream functionality.
|
||||
|
||||
st internalServerTransport
|
||||
ctxDone <-chan struct{} // closed at the end of stream. Cache of ctx.Done() (for performance)
|
||||
// cancel is invoked at the end of stream to cancel ctx. It also stops the
|
||||
// timer for monitoring the rpc deadline if configured.
|
||||
cancel func()
|
||||
|
||||
// Holds compressor names passed in grpc-accept-encoding metadata from the
|
||||
// client.
|
||||
clientAdvertisedCompressors string
|
||||
headerWireLength int
|
||||
|
||||
// hdrMu protects outgoing header and trailer metadata.
|
||||
hdrMu sync.Mutex
|
||||
header metadata.MD // the outgoing header metadata. Updated by WriteHeader.
|
||||
headerSent atomic.Bool // atomically set when the headers are sent out.
|
||||
}
|
||||
|
||||
// Read reads an n byte message from the input stream.
|
||||
func (s *ServerStream) Read(n int) (mem.BufferSlice, error) {
|
||||
b, err := s.Stream.read(n)
|
||||
if err == nil {
|
||||
s.st.incrMsgRecv()
|
||||
}
|
||||
return b, err
|
||||
}
|
||||
|
||||
// SendHeader sends the header metadata for the given stream.
|
||||
func (s *ServerStream) SendHeader(md metadata.MD) error {
|
||||
return s.st.writeHeader(s, md)
|
||||
}
|
||||
|
||||
// Write writes the hdr and data bytes to the output stream.
|
||||
func (s *ServerStream) Write(hdr []byte, data mem.BufferSlice, opts *WriteOptions) error {
|
||||
return s.st.write(s, hdr, data, opts)
|
||||
}
|
||||
|
||||
// WriteStatus sends the status of a stream to the client. WriteStatus is
|
||||
// the final call made on a stream and always occurs.
|
||||
func (s *ServerStream) WriteStatus(st *status.Status) error {
|
||||
return s.st.writeStatus(s, st)
|
||||
}
|
||||
|
||||
// isHeaderSent indicates whether headers have been sent.
|
||||
func (s *ServerStream) isHeaderSent() bool {
|
||||
return s.headerSent.Load()
|
||||
}
|
||||
|
||||
// updateHeaderSent updates headerSent and returns true
|
||||
// if it was already set.
|
||||
func (s *ServerStream) updateHeaderSent() bool {
|
||||
return s.headerSent.Swap(true)
|
||||
}
|
||||
|
||||
// RecvCompress returns the compression algorithm applied to the inbound
|
||||
// message. It is empty string if there is no compression applied.
|
||||
func (s *ServerStream) RecvCompress() string {
|
||||
return s.recvCompress
|
||||
}
|
||||
|
||||
// SendCompress returns the send compressor name.
|
||||
func (s *ServerStream) SendCompress() string {
|
||||
return s.sendCompress
|
||||
}
|
||||
|
||||
// ContentSubtype returns the content-subtype for a request. For example, a
|
||||
// content-subtype of "proto" will result in a content-type of
|
||||
// "application/grpc+proto". This will always be lowercase. See
|
||||
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
|
||||
// more details.
|
||||
func (s *ServerStream) ContentSubtype() string {
|
||||
return s.contentSubtype
|
||||
}
|
||||
|
||||
// SetSendCompress sets the compression algorithm to the stream.
|
||||
func (s *ServerStream) SetSendCompress(name string) error {
|
||||
if s.isHeaderSent() || s.getState() == streamDone {
|
||||
return errors.New("transport: set send compressor called after headers sent or stream done")
|
||||
}
|
||||
|
||||
s.sendCompress = name
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetContext sets the context of the stream. This will be deleted once the
|
||||
// stats handler callouts all move to gRPC layer.
|
||||
func (s *ServerStream) SetContext(ctx context.Context) {
|
||||
s.ctx = ctx
|
||||
}
|
||||
|
||||
// ClientAdvertisedCompressors returns the compressor names advertised by the
|
||||
// client via grpc-accept-encoding header.
|
||||
func (s *ServerStream) ClientAdvertisedCompressors() []string {
|
||||
values := strings.Split(s.clientAdvertisedCompressors, ",")
|
||||
for i, v := range values {
|
||||
values[i] = strings.TrimSpace(v)
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
// Header returns the header metadata of the stream. It returns the out header
|
||||
// after t.WriteHeader is called. It does not block and must not be called
|
||||
// until after WriteHeader.
|
||||
func (s *ServerStream) Header() (metadata.MD, error) {
|
||||
// Return the header in stream. It will be the out
|
||||
// header after t.WriteHeader is called.
|
||||
return s.header.Copy(), nil
|
||||
}
|
||||
|
||||
// HeaderWireLength returns the size of the headers of the stream as received
|
||||
// from the wire.
|
||||
func (s *ServerStream) HeaderWireLength() int {
|
||||
return s.headerWireLength
|
||||
}
|
||||
|
||||
// SetHeader sets the header metadata. This can be called multiple times.
|
||||
// This should not be called in parallel to other data writes.
|
||||
func (s *ServerStream) SetHeader(md metadata.MD) error {
|
||||
if md.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
if s.isHeaderSent() || s.getState() == streamDone {
|
||||
return ErrIllegalHeaderWrite
|
||||
}
|
||||
s.hdrMu.Lock()
|
||||
s.header = metadata.Join(s.header, md)
|
||||
s.hdrMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetTrailer sets the trailer metadata which will be sent with the RPC status
|
||||
// by the server. This can be called multiple times.
|
||||
// This should not be called parallel to other data writes.
|
||||
func (s *ServerStream) SetTrailer(md metadata.MD) error {
|
||||
if md.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
if s.getState() == streamDone {
|
||||
return ErrIllegalHeaderWrite
|
||||
}
|
||||
s.hdrMu.Lock()
|
||||
s.trailer = metadata.Join(s.trailer, md)
|
||||
s.hdrMu.Unlock()
|
||||
return nil
|
||||
}
|
522
vendor/google.golang.org/grpc/internal/transport/transport.go
generated
vendored
522
vendor/google.golang.org/grpc/internal/transport/transport.go
generated
vendored
@@ -22,13 +22,11 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -37,9 +35,9 @@ import (
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/mem"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/stats"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/grpc/tap"
|
||||
@@ -47,32 +45,10 @@ import (
|
||||
|
||||
const logLevel = 2
|
||||
|
||||
type bufferPool struct {
|
||||
pool sync.Pool
|
||||
}
|
||||
|
||||
func newBufferPool() *bufferPool {
|
||||
return &bufferPool{
|
||||
pool: sync.Pool{
|
||||
New: func() any {
|
||||
return new(bytes.Buffer)
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (p *bufferPool) get() *bytes.Buffer {
|
||||
return p.pool.Get().(*bytes.Buffer)
|
||||
}
|
||||
|
||||
func (p *bufferPool) put(b *bytes.Buffer) {
|
||||
p.pool.Put(b)
|
||||
}
|
||||
|
||||
// recvMsg represents the received msg from the transport. All transport
|
||||
// protocol specific info has been removed.
|
||||
type recvMsg struct {
|
||||
buffer *bytes.Buffer
|
||||
buffer mem.Buffer
|
||||
// nil: received some data
|
||||
// io.EOF: stream is completed. data is nil.
|
||||
// other non-nil error: transport failure. data is nil.
|
||||
@@ -102,6 +78,9 @@ func newRecvBuffer() *recvBuffer {
|
||||
func (b *recvBuffer) put(r recvMsg) {
|
||||
b.mu.Lock()
|
||||
if b.err != nil {
|
||||
// drop the buffer on the floor. Since b.err is not nil, any subsequent reads
|
||||
// will always return an error, making this buffer inaccessible.
|
||||
r.buffer.Free()
|
||||
b.mu.Unlock()
|
||||
// An error had occurred earlier, don't accept more
|
||||
// data or errors.
|
||||
@@ -148,45 +127,70 @@ type recvBufferReader struct {
|
||||
ctx context.Context
|
||||
ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
|
||||
recv *recvBuffer
|
||||
last *bytes.Buffer // Stores the remaining data in the previous calls.
|
||||
last mem.Buffer // Stores the remaining data in the previous calls.
|
||||
err error
|
||||
freeBuffer func(*bytes.Buffer)
|
||||
}
|
||||
|
||||
// Read reads the next len(p) bytes from last. If last is drained, it tries to
|
||||
// read additional data from recv. It blocks if there no additional data available
|
||||
// in recv. If Read returns any non-nil error, it will continue to return that error.
|
||||
func (r *recvBufferReader) Read(p []byte) (n int, err error) {
|
||||
func (r *recvBufferReader) ReadMessageHeader(header []byte) (n int, err error) {
|
||||
if r.err != nil {
|
||||
return 0, r.err
|
||||
}
|
||||
if r.last != nil {
|
||||
// Read remaining data left in last call.
|
||||
copied, _ := r.last.Read(p)
|
||||
if r.last.Len() == 0 {
|
||||
r.freeBuffer(r.last)
|
||||
r.last = nil
|
||||
}
|
||||
return copied, nil
|
||||
n, r.last = mem.ReadUnsafe(header, r.last)
|
||||
return n, nil
|
||||
}
|
||||
if r.closeStream != nil {
|
||||
n, r.err = r.readClient(p)
|
||||
n, r.err = r.readMessageHeaderClient(header)
|
||||
} else {
|
||||
n, r.err = r.read(p)
|
||||
n, r.err = r.readMessageHeader(header)
|
||||
}
|
||||
return n, r.err
|
||||
}
|
||||
|
||||
func (r *recvBufferReader) read(p []byte) (n int, err error) {
|
||||
// Read reads the next n bytes from last. If last is drained, it tries to read
|
||||
// additional data from recv. It blocks if there no additional data available in
|
||||
// recv. If Read returns any non-nil error, it will continue to return that
|
||||
// error.
|
||||
func (r *recvBufferReader) Read(n int) (buf mem.Buffer, err error) {
|
||||
if r.err != nil {
|
||||
return nil, r.err
|
||||
}
|
||||
if r.last != nil {
|
||||
buf = r.last
|
||||
if r.last.Len() > n {
|
||||
buf, r.last = mem.SplitUnsafe(buf, n)
|
||||
} else {
|
||||
r.last = nil
|
||||
}
|
||||
return buf, nil
|
||||
}
|
||||
if r.closeStream != nil {
|
||||
buf, r.err = r.readClient(n)
|
||||
} else {
|
||||
buf, r.err = r.read(n)
|
||||
}
|
||||
return buf, r.err
|
||||
}
|
||||
|
||||
func (r *recvBufferReader) readMessageHeader(header []byte) (n int, err error) {
|
||||
select {
|
||||
case <-r.ctxDone:
|
||||
return 0, ContextErr(r.ctx.Err())
|
||||
case m := <-r.recv.get():
|
||||
return r.readAdditional(m, p)
|
||||
return r.readMessageHeaderAdditional(m, header)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
|
||||
func (r *recvBufferReader) read(n int) (buf mem.Buffer, err error) {
|
||||
select {
|
||||
case <-r.ctxDone:
|
||||
return nil, ContextErr(r.ctx.Err())
|
||||
case m := <-r.recv.get():
|
||||
return r.readAdditional(m, n)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *recvBufferReader) readMessageHeaderClient(header []byte) (n int, err error) {
|
||||
// If the context is canceled, then closes the stream with nil metadata.
|
||||
// closeStream writes its error parameter to r.recv as a recvMsg.
|
||||
// r.readAdditional acts on that message and returns the necessary error.
|
||||
@@ -207,25 +211,67 @@ func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
|
||||
// faster.
|
||||
r.closeStream(ContextErr(r.ctx.Err()))
|
||||
m := <-r.recv.get()
|
||||
return r.readAdditional(m, p)
|
||||
return r.readMessageHeaderAdditional(m, header)
|
||||
case m := <-r.recv.get():
|
||||
return r.readAdditional(m, p)
|
||||
return r.readMessageHeaderAdditional(m, header)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error) {
|
||||
func (r *recvBufferReader) readClient(n int) (buf mem.Buffer, err error) {
|
||||
// If the context is canceled, then closes the stream with nil metadata.
|
||||
// closeStream writes its error parameter to r.recv as a recvMsg.
|
||||
// r.readAdditional acts on that message and returns the necessary error.
|
||||
select {
|
||||
case <-r.ctxDone:
|
||||
// Note that this adds the ctx error to the end of recv buffer, and
|
||||
// reads from the head. This will delay the error until recv buffer is
|
||||
// empty, thus will delay ctx cancellation in Recv().
|
||||
//
|
||||
// It's done this way to fix a race between ctx cancel and trailer. The
|
||||
// race was, stream.Recv() may return ctx error if ctxDone wins the
|
||||
// race, but stream.Trailer() may return a non-nil md because the stream
|
||||
// was not marked as done when trailer is received. This closeStream
|
||||
// call will mark stream as done, thus fix the race.
|
||||
//
|
||||
// TODO: delaying ctx error seems like a unnecessary side effect. What
|
||||
// we really want is to mark the stream as done, and return ctx error
|
||||
// faster.
|
||||
r.closeStream(ContextErr(r.ctx.Err()))
|
||||
m := <-r.recv.get()
|
||||
return r.readAdditional(m, n)
|
||||
case m := <-r.recv.get():
|
||||
return r.readAdditional(m, n)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *recvBufferReader) readMessageHeaderAdditional(m recvMsg, header []byte) (n int, err error) {
|
||||
r.recv.load()
|
||||
if m.err != nil {
|
||||
if m.buffer != nil {
|
||||
m.buffer.Free()
|
||||
}
|
||||
return 0, m.err
|
||||
}
|
||||
copied, _ := m.buffer.Read(p)
|
||||
if m.buffer.Len() == 0 {
|
||||
r.freeBuffer(m.buffer)
|
||||
r.last = nil
|
||||
} else {
|
||||
r.last = m.buffer
|
||||
|
||||
n, r.last = mem.ReadUnsafe(header, m.buffer)
|
||||
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (r *recvBufferReader) readAdditional(m recvMsg, n int) (b mem.Buffer, err error) {
|
||||
r.recv.load()
|
||||
if m.err != nil {
|
||||
if m.buffer != nil {
|
||||
m.buffer.Free()
|
||||
}
|
||||
return nil, m.err
|
||||
}
|
||||
return copied, nil
|
||||
|
||||
if m.buffer.Len() > n {
|
||||
m.buffer, r.last = mem.SplitUnsafe(m.buffer, n)
|
||||
}
|
||||
|
||||
return m.buffer, nil
|
||||
}
|
||||
|
||||
type streamState uint32
|
||||
@@ -240,73 +286,26 @@ const (
|
||||
// Stream represents an RPC in the transport layer.
|
||||
type Stream struct {
|
||||
id uint32
|
||||
st ServerTransport // nil for client side Stream
|
||||
ct *http2Client // nil for server side Stream
|
||||
ctx context.Context // the associated context of the stream
|
||||
cancel context.CancelFunc // always nil for client side Stream
|
||||
done chan struct{} // closed at the end of stream to unblock writers. On the client side.
|
||||
doneFunc func() // invoked at the end of stream on client side.
|
||||
ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
|
||||
method string // the associated RPC method of the stream
|
||||
ctx context.Context // the associated context of the stream
|
||||
method string // the associated RPC method of the stream
|
||||
recvCompress string
|
||||
sendCompress string
|
||||
buf *recvBuffer
|
||||
trReader io.Reader
|
||||
trReader *transportReader
|
||||
fc *inFlow
|
||||
wq *writeQuota
|
||||
|
||||
// Holds compressor names passed in grpc-accept-encoding metadata from the
|
||||
// client. This is empty for the client side stream.
|
||||
clientAdvertisedCompressors string
|
||||
// Callback to state application's intentions to read data. This
|
||||
// is used to adjust flow control, if needed.
|
||||
requestRead func(int)
|
||||
|
||||
headerChan chan struct{} // closed to indicate the end of header metadata.
|
||||
headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
|
||||
// headerValid indicates whether a valid header was received. Only
|
||||
// meaningful after headerChan is closed (always call waitOnHeader() before
|
||||
// reading its value). Not valid on server side.
|
||||
headerValid bool
|
||||
headerWireLength int // Only set on server side.
|
||||
|
||||
// hdrMu protects header and trailer metadata on the server-side.
|
||||
hdrMu sync.Mutex
|
||||
// On client side, header keeps the received header metadata.
|
||||
//
|
||||
// On server side, header keeps the header set by SetHeader(). The complete
|
||||
// header will merged into this after t.WriteHeader() is called.
|
||||
header metadata.MD
|
||||
trailer metadata.MD // the key-value map of trailer metadata.
|
||||
|
||||
noHeaders bool // set if the client never received headers (set only after the stream is done).
|
||||
|
||||
// On the server-side, headerSent is atomically set to 1 when the headers are sent out.
|
||||
headerSent uint32
|
||||
|
||||
state streamState
|
||||
|
||||
// On client-side it is the status error received from the server.
|
||||
// On server-side it is unused.
|
||||
status *status.Status
|
||||
|
||||
bytesReceived uint32 // indicates whether any bytes have been received on this stream
|
||||
unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
|
||||
|
||||
// contentSubtype is the content-subtype for requests.
|
||||
// this must be lowercase or the behavior is undefined.
|
||||
contentSubtype string
|
||||
}
|
||||
|
||||
// isHeaderSent is only valid on the server-side.
|
||||
func (s *Stream) isHeaderSent() bool {
|
||||
return atomic.LoadUint32(&s.headerSent) == 1
|
||||
}
|
||||
|
||||
// updateHeaderSent updates headerSent and returns true
|
||||
// if it was alreay set. It is valid only on server-side.
|
||||
func (s *Stream) updateHeaderSent() bool {
|
||||
return atomic.SwapUint32(&s.headerSent, 1) == 1
|
||||
trailer metadata.MD // the key-value map of trailer metadata.
|
||||
}
|
||||
|
||||
func (s *Stream) swapState(st streamState) streamState {
|
||||
@@ -321,110 +320,12 @@ func (s *Stream) getState() streamState {
|
||||
return streamState(atomic.LoadUint32((*uint32)(&s.state)))
|
||||
}
|
||||
|
||||
func (s *Stream) waitOnHeader() {
|
||||
if s.headerChan == nil {
|
||||
// On the server headerChan is always nil since a stream originates
|
||||
// only after having received headers.
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
// Close the stream to prevent headers/trailers from changing after
|
||||
// this function returns.
|
||||
s.ct.CloseStream(s, ContextErr(s.ctx.Err()))
|
||||
// headerChan could possibly not be closed yet if closeStream raced
|
||||
// with operateHeaders; wait until it is closed explicitly here.
|
||||
<-s.headerChan
|
||||
case <-s.headerChan:
|
||||
}
|
||||
}
|
||||
|
||||
// RecvCompress returns the compression algorithm applied to the inbound
|
||||
// message. It is empty string if there is no compression applied.
|
||||
func (s *Stream) RecvCompress() string {
|
||||
s.waitOnHeader()
|
||||
return s.recvCompress
|
||||
}
|
||||
|
||||
// SetSendCompress sets the compression algorithm to the stream.
|
||||
func (s *Stream) SetSendCompress(name string) error {
|
||||
if s.isHeaderSent() || s.getState() == streamDone {
|
||||
return errors.New("transport: set send compressor called after headers sent or stream done")
|
||||
}
|
||||
|
||||
s.sendCompress = name
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendCompress returns the send compressor name.
|
||||
func (s *Stream) SendCompress() string {
|
||||
return s.sendCompress
|
||||
}
|
||||
|
||||
// ClientAdvertisedCompressors returns the compressor names advertised by the
|
||||
// client via grpc-accept-encoding header.
|
||||
func (s *Stream) ClientAdvertisedCompressors() []string {
|
||||
values := strings.Split(s.clientAdvertisedCompressors, ",")
|
||||
for i, v := range values {
|
||||
values[i] = strings.TrimSpace(v)
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
// Done returns a channel which is closed when it receives the final status
|
||||
// from the server.
|
||||
func (s *Stream) Done() <-chan struct{} {
|
||||
return s.done
|
||||
}
|
||||
|
||||
// Header returns the header metadata of the stream.
|
||||
//
|
||||
// On client side, it acquires the key-value pairs of header metadata once it is
|
||||
// available. It blocks until i) the metadata is ready or ii) there is no header
|
||||
// metadata or iii) the stream is canceled/expired.
|
||||
//
|
||||
// On server side, it returns the out header after t.WriteHeader is called. It
|
||||
// does not block and must not be called until after WriteHeader.
|
||||
func (s *Stream) Header() (metadata.MD, error) {
|
||||
if s.headerChan == nil {
|
||||
// On server side, return the header in stream. It will be the out
|
||||
// header after t.WriteHeader is called.
|
||||
return s.header.Copy(), nil
|
||||
}
|
||||
s.waitOnHeader()
|
||||
|
||||
if !s.headerValid || s.noHeaders {
|
||||
return nil, s.status.Err()
|
||||
}
|
||||
|
||||
return s.header.Copy(), nil
|
||||
}
|
||||
|
||||
// TrailersOnly blocks until a header or trailers-only frame is received and
|
||||
// then returns true if the stream was trailers-only. If the stream ends
|
||||
// before headers are received, returns true, nil. Client-side only.
|
||||
func (s *Stream) TrailersOnly() bool {
|
||||
s.waitOnHeader()
|
||||
return s.noHeaders
|
||||
}
|
||||
|
||||
// Trailer returns the cached trailer metedata. Note that if it is not called
|
||||
// after the entire stream is done, it could return an empty MD. Client
|
||||
// side only.
|
||||
// Trailer returns the cached trailer metadata. Note that if it is not called
|
||||
// after the entire stream is done, it could return an empty MD.
|
||||
// It can be safely read only after stream has ended that is either read
|
||||
// or write have returned io.EOF.
|
||||
func (s *Stream) Trailer() metadata.MD {
|
||||
c := s.trailer.Copy()
|
||||
return c
|
||||
}
|
||||
|
||||
// ContentSubtype returns the content-subtype for a request. For example, a
|
||||
// content-subtype of "proto" will result in a content-type of
|
||||
// "application/grpc+proto". This will always be lowercase. See
|
||||
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
|
||||
// more details.
|
||||
func (s *Stream) ContentSubtype() string {
|
||||
return s.contentSubtype
|
||||
return s.trailer.Copy()
|
||||
}
|
||||
|
||||
// Context returns the context of the stream.
|
||||
@@ -432,114 +333,104 @@ func (s *Stream) Context() context.Context {
|
||||
return s.ctx
|
||||
}
|
||||
|
||||
// SetContext sets the context of the stream. This will be deleted once the
|
||||
// stats handler callouts all move to gRPC layer.
|
||||
func (s *Stream) SetContext(ctx context.Context) {
|
||||
s.ctx = ctx
|
||||
}
|
||||
|
||||
// Method returns the method for the stream.
|
||||
func (s *Stream) Method() string {
|
||||
return s.method
|
||||
}
|
||||
|
||||
// Status returns the status received from the server.
|
||||
// Status can be read safely only after the stream has ended,
|
||||
// that is, after Done() is closed.
|
||||
func (s *Stream) Status() *status.Status {
|
||||
return s.status
|
||||
}
|
||||
|
||||
// HeaderWireLength returns the size of the headers of the stream as received
|
||||
// from the wire. Valid only on the server.
|
||||
func (s *Stream) HeaderWireLength() int {
|
||||
return s.headerWireLength
|
||||
}
|
||||
|
||||
// SetHeader sets the header metadata. This can be called multiple times.
|
||||
// Server side only.
|
||||
// This should not be called in parallel to other data writes.
|
||||
func (s *Stream) SetHeader(md metadata.MD) error {
|
||||
if md.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
if s.isHeaderSent() || s.getState() == streamDone {
|
||||
return ErrIllegalHeaderWrite
|
||||
}
|
||||
s.hdrMu.Lock()
|
||||
s.header = metadata.Join(s.header, md)
|
||||
s.hdrMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendHeader sends the given header metadata. The given metadata is
|
||||
// combined with any metadata set by previous calls to SetHeader and
|
||||
// then written to the transport stream.
|
||||
func (s *Stream) SendHeader(md metadata.MD) error {
|
||||
return s.st.WriteHeader(s, md)
|
||||
}
|
||||
|
||||
// SetTrailer sets the trailer metadata which will be sent with the RPC status
|
||||
// by the server. This can be called multiple times. Server side only.
|
||||
// This should not be called parallel to other data writes.
|
||||
func (s *Stream) SetTrailer(md metadata.MD) error {
|
||||
if md.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
if s.getState() == streamDone {
|
||||
return ErrIllegalHeaderWrite
|
||||
}
|
||||
s.hdrMu.Lock()
|
||||
s.trailer = metadata.Join(s.trailer, md)
|
||||
s.hdrMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Stream) write(m recvMsg) {
|
||||
s.buf.put(m)
|
||||
}
|
||||
|
||||
// Read reads all p bytes from the wire for this stream.
|
||||
func (s *Stream) Read(p []byte) (n int, err error) {
|
||||
// ReadMessageHeader reads data into the provided header slice from the stream.
|
||||
// It first checks if there was an error during a previous read operation and
|
||||
// returns it if present. It then requests a read operation for the length of
|
||||
// the header. It continues to read from the stream until the entire header
|
||||
// slice is filled or an error occurs. If an `io.EOF` error is encountered with
|
||||
// partially read data, it is converted to `io.ErrUnexpectedEOF` to indicate an
|
||||
// unexpected end of the stream. The method returns any error encountered during
|
||||
// the read process or nil if the header was successfully read.
|
||||
func (s *Stream) ReadMessageHeader(header []byte) (err error) {
|
||||
// Don't request a read if there was an error earlier
|
||||
if er := s.trReader.(*transportReader).er; er != nil {
|
||||
return 0, er
|
||||
if er := s.trReader.er; er != nil {
|
||||
return er
|
||||
}
|
||||
s.requestRead(len(p))
|
||||
return io.ReadFull(s.trReader, p)
|
||||
s.requestRead(len(header))
|
||||
for len(header) != 0 {
|
||||
n, err := s.trReader.ReadMessageHeader(header)
|
||||
header = header[n:]
|
||||
if len(header) == 0 {
|
||||
err = nil
|
||||
}
|
||||
if err != nil {
|
||||
if n > 0 && err == io.EOF {
|
||||
err = io.ErrUnexpectedEOF
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// tranportReader reads all the data available for this Stream from the transport and
|
||||
// Read reads n bytes from the wire for this stream.
|
||||
func (s *Stream) read(n int) (data mem.BufferSlice, err error) {
|
||||
// Don't request a read if there was an error earlier
|
||||
if er := s.trReader.er; er != nil {
|
||||
return nil, er
|
||||
}
|
||||
s.requestRead(n)
|
||||
for n != 0 {
|
||||
buf, err := s.trReader.Read(n)
|
||||
var bufLen int
|
||||
if buf != nil {
|
||||
bufLen = buf.Len()
|
||||
}
|
||||
n -= bufLen
|
||||
if n == 0 {
|
||||
err = nil
|
||||
}
|
||||
if err != nil {
|
||||
if bufLen > 0 && err == io.EOF {
|
||||
err = io.ErrUnexpectedEOF
|
||||
}
|
||||
data.Free()
|
||||
return nil, err
|
||||
}
|
||||
data = append(data, buf)
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// transportReader reads all the data available for this Stream from the transport and
|
||||
// passes them into the decoder, which converts them into a gRPC message stream.
|
||||
// The error is io.EOF when the stream is done or another non-nil error if
|
||||
// the stream broke.
|
||||
type transportReader struct {
|
||||
reader io.Reader
|
||||
reader *recvBufferReader
|
||||
// The handler to control the window update procedure for both this
|
||||
// particular stream and the associated transport.
|
||||
windowHandler func(int)
|
||||
er error
|
||||
}
|
||||
|
||||
func (t *transportReader) Read(p []byte) (n int, err error) {
|
||||
n, err = t.reader.Read(p)
|
||||
func (t *transportReader) ReadMessageHeader(header []byte) (int, error) {
|
||||
n, err := t.reader.ReadMessageHeader(header)
|
||||
if err != nil {
|
||||
t.er = err
|
||||
return
|
||||
return 0, err
|
||||
}
|
||||
t.windowHandler(n)
|
||||
return
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// BytesReceived indicates whether any bytes have been received on this stream.
|
||||
func (s *Stream) BytesReceived() bool {
|
||||
return atomic.LoadUint32(&s.bytesReceived) == 1
|
||||
}
|
||||
|
||||
// Unprocessed indicates whether the server did not process this stream --
|
||||
// i.e. it sent a refused stream or GOAWAY including this stream ID.
|
||||
func (s *Stream) Unprocessed() bool {
|
||||
return atomic.LoadUint32(&s.unprocessed) == 1
|
||||
func (t *transportReader) Read(n int) (mem.Buffer, error) {
|
||||
buf, err := t.reader.Read(n)
|
||||
if err != nil {
|
||||
t.er = err
|
||||
return buf, err
|
||||
}
|
||||
t.windowHandler(buf.Len())
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
// GoString is implemented by Stream so context.String() won't
|
||||
@@ -574,6 +465,7 @@ type ServerConfig struct {
|
||||
ChannelzParent *channelz.Server
|
||||
MaxHeaderListSize *uint32
|
||||
HeaderTableSize *uint32
|
||||
BufferPool mem.BufferPool
|
||||
}
|
||||
|
||||
// ConnectOptions covers all relevant options for communicating with the server.
|
||||
@@ -610,19 +502,13 @@ type ConnectOptions struct {
|
||||
ChannelzParent *channelz.SubChannel
|
||||
// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
|
||||
MaxHeaderListSize *uint32
|
||||
// UseProxy specifies if a proxy should be used.
|
||||
UseProxy bool
|
||||
// The mem.BufferPool to use when reading/writing to the wire.
|
||||
BufferPool mem.BufferPool
|
||||
}
|
||||
|
||||
// NewClientTransport establishes the transport with the required ConnectOptions
|
||||
// and returns it to the caller.
|
||||
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (ClientTransport, error) {
|
||||
return newHTTP2Client(connectCtx, ctx, addr, opts, onClose)
|
||||
}
|
||||
|
||||
// Options provides additional hints and information for message
|
||||
// WriteOptions provides additional hints and information for message
|
||||
// transmission.
|
||||
type Options struct {
|
||||
type WriteOptions struct {
|
||||
// Last indicates whether this write is the last piece for
|
||||
// this stream.
|
||||
Last bool
|
||||
@@ -671,18 +557,8 @@ type ClientTransport interface {
|
||||
// It does not block.
|
||||
GracefulClose()
|
||||
|
||||
// Write sends the data for the given stream. A nil stream indicates
|
||||
// the write is to be performed on the transport as a whole.
|
||||
Write(s *Stream, hdr []byte, data []byte, opts *Options) error
|
||||
|
||||
// NewStream creates a Stream for an RPC.
|
||||
NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
|
||||
|
||||
// CloseStream clears the footprint of a stream when the stream is
|
||||
// not needed any more. The err indicates the error incurred when
|
||||
// CloseStream is called. Must be called when a stream is finished
|
||||
// unless the associated transport is closing.
|
||||
CloseStream(stream *Stream, err error)
|
||||
NewStream(ctx context.Context, callHdr *CallHdr) (*ClientStream, error)
|
||||
|
||||
// Error returns a channel that is closed when some I/O error
|
||||
// happens. Typically the caller should have a goroutine to monitor
|
||||
@@ -702,12 +578,6 @@ type ClientTransport interface {
|
||||
|
||||
// RemoteAddr returns the remote network address.
|
||||
RemoteAddr() net.Addr
|
||||
|
||||
// IncrMsgSent increments the number of message sent through this transport.
|
||||
IncrMsgSent()
|
||||
|
||||
// IncrMsgRecv increments the number of message received through this transport.
|
||||
IncrMsgRecv()
|
||||
}
|
||||
|
||||
// ServerTransport is the common interface for all gRPC server-side transport
|
||||
@@ -717,19 +587,7 @@ type ClientTransport interface {
|
||||
// Write methods for a given Stream will be called serially.
|
||||
type ServerTransport interface {
|
||||
// HandleStreams receives incoming streams using the given handler.
|
||||
HandleStreams(context.Context, func(*Stream))
|
||||
|
||||
// WriteHeader sends the header metadata for the given stream.
|
||||
// WriteHeader may not be called on all streams.
|
||||
WriteHeader(s *Stream, md metadata.MD) error
|
||||
|
||||
// Write sends the data for the given stream.
|
||||
// Write may not be called on all streams.
|
||||
Write(s *Stream, hdr []byte, data []byte, opts *Options) error
|
||||
|
||||
// WriteStatus sends the status of a stream to the client. WriteStatus is
|
||||
// the final call made on a stream and always occurs.
|
||||
WriteStatus(s *Stream, st *status.Status) error
|
||||
HandleStreams(context.Context, func(*ServerStream))
|
||||
|
||||
// Close tears down the transport. Once it is called, the transport
|
||||
// should not be accessed any more. All the pending streams and their
|
||||
@@ -741,12 +599,14 @@ type ServerTransport interface {
|
||||
|
||||
// Drain notifies the client this ServerTransport stops accepting new RPCs.
|
||||
Drain(debugData string)
|
||||
}
|
||||
|
||||
// IncrMsgSent increments the number of message sent through this transport.
|
||||
IncrMsgSent()
|
||||
|
||||
// IncrMsgRecv increments the number of message received through this transport.
|
||||
IncrMsgRecv()
|
||||
type internalServerTransport interface {
|
||||
ServerTransport
|
||||
writeHeader(s *ServerStream, md metadata.MD) error
|
||||
write(s *ServerStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error
|
||||
writeStatus(s *ServerStream, st *status.Status) error
|
||||
incrMsgRecv()
|
||||
}
|
||||
|
||||
// connectionErrorf creates an ConnectionError with the specified error description.
|
||||
@@ -798,7 +658,7 @@ var (
|
||||
// connection is draining. This could be caused by goaway or balancer
|
||||
// removing the address.
|
||||
errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
|
||||
// errStreamDone is returned from write at the client side to indiacte application
|
||||
// errStreamDone is returned from write at the client side to indicate application
|
||||
// layer of an error.
|
||||
errStreamDone = errors.New("the stream is done")
|
||||
// StatusGoAway indicates that the server sent a GOAWAY that included this
|
||||
|
Reference in New Issue
Block a user