mirror of
https://github.com/cloudflare/cloudflared.git
synced 2025-07-28 06:19:57 +00:00
2
vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go
generated
vendored
2
vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go
generated
vendored
@@ -193,6 +193,8 @@ func (gsb *Balancer) ExitIdle() {
|
||||
ei.ExitIdle()
|
||||
return
|
||||
}
|
||||
gsb.mu.Lock()
|
||||
defer gsb.mu.Unlock()
|
||||
for sc := range balToUpdate.subconns {
|
||||
sc.Connect()
|
||||
}
|
||||
|
24
vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
generated
vendored
24
vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
generated
vendored
@@ -37,30 +37,30 @@ type Logger interface {
|
||||
// binLogger is the global binary logger for the binary. One of this should be
|
||||
// built at init time from the configuration (environment variable or flags).
|
||||
//
|
||||
// It is used to get a methodLogger for each individual method.
|
||||
// It is used to get a MethodLogger for each individual method.
|
||||
var binLogger Logger
|
||||
|
||||
var grpclogLogger = grpclog.Component("binarylog")
|
||||
|
||||
// SetLogger sets the binarg logger.
|
||||
// SetLogger sets the binary logger.
|
||||
//
|
||||
// Only call this at init time.
|
||||
func SetLogger(l Logger) {
|
||||
binLogger = l
|
||||
}
|
||||
|
||||
// GetLogger gets the binarg logger.
|
||||
// GetLogger gets the binary logger.
|
||||
//
|
||||
// Only call this at init time.
|
||||
func GetLogger() Logger {
|
||||
return binLogger
|
||||
}
|
||||
|
||||
// GetMethodLogger returns the methodLogger for the given methodName.
|
||||
// GetMethodLogger returns the MethodLogger for the given methodName.
|
||||
//
|
||||
// methodName should be in the format of "/service/method".
|
||||
//
|
||||
// Each methodLogger returned by this method is a new instance. This is to
|
||||
// Each MethodLogger returned by this method is a new instance. This is to
|
||||
// generate sequence id within the call.
|
||||
func GetMethodLogger(methodName string) MethodLogger {
|
||||
if binLogger == nil {
|
||||
@@ -117,7 +117,7 @@ func (l *logger) setDefaultMethodLogger(ml *MethodLoggerConfig) error {
|
||||
|
||||
// Set method logger for "service/*".
|
||||
//
|
||||
// New methodLogger with same service overrides the old one.
|
||||
// New MethodLogger with same service overrides the old one.
|
||||
func (l *logger) setServiceMethodLogger(service string, ml *MethodLoggerConfig) error {
|
||||
if _, ok := l.config.Services[service]; ok {
|
||||
return fmt.Errorf("conflicting service rules for service %v found", service)
|
||||
@@ -131,7 +131,7 @@ func (l *logger) setServiceMethodLogger(service string, ml *MethodLoggerConfig)
|
||||
|
||||
// Set method logger for "service/method".
|
||||
//
|
||||
// New methodLogger with same method overrides the old one.
|
||||
// New MethodLogger with same method overrides the old one.
|
||||
func (l *logger) setMethodMethodLogger(method string, ml *MethodLoggerConfig) error {
|
||||
if _, ok := l.config.Blacklist[method]; ok {
|
||||
return fmt.Errorf("conflicting blacklist rules for method %v found", method)
|
||||
@@ -161,11 +161,11 @@ func (l *logger) setBlacklist(method string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// getMethodLogger returns the methodLogger for the given methodName.
|
||||
// getMethodLogger returns the MethodLogger for the given methodName.
|
||||
//
|
||||
// methodName should be in the format of "/service/method".
|
||||
//
|
||||
// Each methodLogger returned by this method is a new instance. This is to
|
||||
// Each MethodLogger returned by this method is a new instance. This is to
|
||||
// generate sequence id within the call.
|
||||
func (l *logger) GetMethodLogger(methodName string) MethodLogger {
|
||||
s, m, err := grpcutil.ParseMethod(methodName)
|
||||
@@ -174,16 +174,16 @@ func (l *logger) GetMethodLogger(methodName string) MethodLogger {
|
||||
return nil
|
||||
}
|
||||
if ml, ok := l.config.Methods[s+"/"+m]; ok {
|
||||
return newMethodLogger(ml.Header, ml.Message)
|
||||
return NewTruncatingMethodLogger(ml.Header, ml.Message)
|
||||
}
|
||||
if _, ok := l.config.Blacklist[s+"/"+m]; ok {
|
||||
return nil
|
||||
}
|
||||
if ml, ok := l.config.Services[s]; ok {
|
||||
return newMethodLogger(ml.Header, ml.Message)
|
||||
return NewTruncatingMethodLogger(ml.Header, ml.Message)
|
||||
}
|
||||
if l.config.All == nil {
|
||||
return nil
|
||||
}
|
||||
return newMethodLogger(l.config.All.Header, l.config.All.Message)
|
||||
return NewTruncatingMethodLogger(l.config.All.Header, l.config.All.Message)
|
||||
}
|
||||
|
20
vendor/google.golang.org/grpc/internal/binarylog/env_config.go
generated
vendored
20
vendor/google.golang.org/grpc/internal/binarylog/env_config.go
generated
vendored
@@ -30,15 +30,15 @@ import (
|
||||
// to build a new logger and assign it to binarylog.Logger.
|
||||
//
|
||||
// Example filter config strings:
|
||||
// - "" Nothing will be logged
|
||||
// - "*" All headers and messages will be fully logged.
|
||||
// - "*{h}" Only headers will be logged.
|
||||
// - "*{m:256}" Only the first 256 bytes of each message will be logged.
|
||||
// - "Foo/*" Logs every method in service Foo
|
||||
// - "Foo/*,-Foo/Bar" Logs every method in service Foo except method /Foo/Bar
|
||||
// - "Foo/*,Foo/Bar{m:256}" Logs the first 256 bytes of each message in method
|
||||
// /Foo/Bar, logs all headers and messages in every other method in service
|
||||
// Foo.
|
||||
// - "" Nothing will be logged
|
||||
// - "*" All headers and messages will be fully logged.
|
||||
// - "*{h}" Only headers will be logged.
|
||||
// - "*{m:256}" Only the first 256 bytes of each message will be logged.
|
||||
// - "Foo/*" Logs every method in service Foo
|
||||
// - "Foo/*,-Foo/Bar" Logs every method in service Foo except method /Foo/Bar
|
||||
// - "Foo/*,Foo/Bar{m:256}" Logs the first 256 bytes of each message in method
|
||||
// /Foo/Bar, logs all headers and messages in every other method in service
|
||||
// Foo.
|
||||
//
|
||||
// If two configs exist for one certain method or service, the one specified
|
||||
// later overrides the previous config.
|
||||
@@ -57,7 +57,7 @@ func NewLoggerFromConfigString(s string) Logger {
|
||||
return l
|
||||
}
|
||||
|
||||
// fillMethodLoggerWithConfigString parses config, creates methodLogger and adds
|
||||
// fillMethodLoggerWithConfigString parses config, creates TruncatingMethodLogger and adds
|
||||
// it to the right map in the logger.
|
||||
func (l *logger) fillMethodLoggerWithConfigString(config string) error {
|
||||
// "" is invalid.
|
||||
|
19
vendor/google.golang.org/grpc/internal/binarylog/method_logger.go
generated
vendored
19
vendor/google.golang.org/grpc/internal/binarylog/method_logger.go
generated
vendored
@@ -52,7 +52,9 @@ type MethodLogger interface {
|
||||
Log(LogEntryConfig)
|
||||
}
|
||||
|
||||
type methodLogger struct {
|
||||
// TruncatingMethodLogger is a method logger that truncates headers and messages
|
||||
// based on configured fields.
|
||||
type TruncatingMethodLogger struct {
|
||||
headerMaxLen, messageMaxLen uint64
|
||||
|
||||
callID uint64
|
||||
@@ -61,8 +63,9 @@ type methodLogger struct {
|
||||
sink Sink // TODO(blog): make this plugable.
|
||||
}
|
||||
|
||||
func newMethodLogger(h, m uint64) *methodLogger {
|
||||
return &methodLogger{
|
||||
// NewTruncatingMethodLogger returns a new truncating method logger.
|
||||
func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger {
|
||||
return &TruncatingMethodLogger{
|
||||
headerMaxLen: h,
|
||||
messageMaxLen: m,
|
||||
|
||||
@@ -75,8 +78,8 @@ func newMethodLogger(h, m uint64) *methodLogger {
|
||||
|
||||
// Build is an internal only method for building the proto message out of the
|
||||
// input event. It's made public to enable other library to reuse as much logic
|
||||
// in methodLogger as possible.
|
||||
func (ml *methodLogger) Build(c LogEntryConfig) *pb.GrpcLogEntry {
|
||||
// in TruncatingMethodLogger as possible.
|
||||
func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *pb.GrpcLogEntry {
|
||||
m := c.toProto()
|
||||
timestamp, _ := ptypes.TimestampProto(time.Now())
|
||||
m.Timestamp = timestamp
|
||||
@@ -95,11 +98,11 @@ func (ml *methodLogger) Build(c LogEntryConfig) *pb.GrpcLogEntry {
|
||||
}
|
||||
|
||||
// Log creates a proto binary log entry, and logs it to the sink.
|
||||
func (ml *methodLogger) Log(c LogEntryConfig) {
|
||||
func (ml *TruncatingMethodLogger) Log(c LogEntryConfig) {
|
||||
ml.sink.Write(ml.Build(c))
|
||||
}
|
||||
|
||||
func (ml *methodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
|
||||
func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
|
||||
if ml.headerMaxLen == maxUInt {
|
||||
return false
|
||||
}
|
||||
@@ -129,7 +132,7 @@ func (ml *methodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
|
||||
return truncated
|
||||
}
|
||||
|
||||
func (ml *methodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) {
|
||||
func (ml *TruncatingMethodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) {
|
||||
if ml.messageMaxLen == maxUInt {
|
||||
return false
|
||||
}
|
||||
|
16
vendor/google.golang.org/grpc/internal/channelz/types.go
generated
vendored
16
vendor/google.golang.org/grpc/internal/channelz/types.go
generated
vendored
@@ -273,10 +273,10 @@ func (c *channel) deleteSelfFromMap() (delete bool) {
|
||||
|
||||
// deleteSelfIfReady tries to delete the channel itself from the channelz database.
|
||||
// The delete process includes two steps:
|
||||
// 1. delete the channel from the entry relation tree, i.e. delete the channel reference from its
|
||||
// parent's child list.
|
||||
// 2. delete the channel from the map, i.e. delete the channel entirely from channelz. Lookup by id
|
||||
// will return entry not found error.
|
||||
// 1. delete the channel from the entry relation tree, i.e. delete the channel reference from its
|
||||
// parent's child list.
|
||||
// 2. delete the channel from the map, i.e. delete the channel entirely from channelz. Lookup by id
|
||||
// will return entry not found error.
|
||||
func (c *channel) deleteSelfIfReady() {
|
||||
if !c.deleteSelfFromTree() {
|
||||
return
|
||||
@@ -381,10 +381,10 @@ func (sc *subChannel) deleteSelfFromMap() (delete bool) {
|
||||
|
||||
// deleteSelfIfReady tries to delete the subchannel itself from the channelz database.
|
||||
// The delete process includes two steps:
|
||||
// 1. delete the subchannel from the entry relation tree, i.e. delete the subchannel reference from
|
||||
// its parent's child list.
|
||||
// 2. delete the subchannel from the map, i.e. delete the subchannel entirely from channelz. Lookup
|
||||
// by id will return entry not found error.
|
||||
// 1. delete the subchannel from the entry relation tree, i.e. delete the subchannel reference from
|
||||
// its parent's child list.
|
||||
// 2. delete the subchannel from the map, i.e. delete the subchannel entirely from channelz. Lookup
|
||||
// by id will return entry not found error.
|
||||
func (sc *subChannel) deleteSelfIfReady() {
|
||||
if !sc.deleteSelfFromTree() {
|
||||
return
|
||||
|
8
vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
generated
vendored
8
vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
generated
vendored
@@ -25,11 +25,15 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
prefix = "GRPC_GO_"
|
||||
txtErrIgnoreStr = prefix + "IGNORE_TXT_ERRORS"
|
||||
prefix = "GRPC_GO_"
|
||||
txtErrIgnoreStr = prefix + "IGNORE_TXT_ERRORS"
|
||||
advertiseCompressorsStr = prefix + "ADVERTISE_COMPRESSORS"
|
||||
)
|
||||
|
||||
var (
|
||||
// TXTErrIgnore is set if TXT errors should be ignored ("GRPC_GO_IGNORE_TXT_ERRORS" is not "false").
|
||||
TXTErrIgnore = !strings.EqualFold(os.Getenv(txtErrIgnoreStr), "false")
|
||||
// AdvertiseCompressors is set if registered compressor should be advertised
|
||||
// ("GRPC_GO_ADVERTISE_COMPRESSORS" is not "false").
|
||||
AdvertiseCompressors = !strings.EqualFold(os.Getenv(advertiseCompressorsStr), "false")
|
||||
)
|
||||
|
36
vendor/google.golang.org/grpc/internal/envconfig/observability.go
generated
vendored
Normal file
36
vendor/google.golang.org/grpc/internal/envconfig/observability.go
generated
vendored
Normal file
@@ -0,0 +1,36 @@
|
||||
/*
|
||||
*
|
||||
* Copyright 2022 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package envconfig
|
||||
|
||||
import "os"
|
||||
|
||||
const (
|
||||
envObservabilityConfig = "GRPC_GCP_OBSERVABILITY_CONFIG"
|
||||
envObservabilityConfigFile = "GRPC_GCP_OBSERVABILITY_CONFIG_FILE"
|
||||
)
|
||||
|
||||
var (
|
||||
// ObservabilityConfig is the json configuration for the gcp/observability
|
||||
// package specified directly in the envObservabilityConfig env var.
|
||||
ObservabilityConfig = os.Getenv(envObservabilityConfig)
|
||||
// ObservabilityConfigFile is the json configuration for the
|
||||
// gcp/observability specified in a file with the location specified in
|
||||
// envObservabilityConfigFile env var.
|
||||
ObservabilityConfigFile = os.Getenv(envObservabilityConfigFile)
|
||||
)
|
8
vendor/google.golang.org/grpc/internal/envconfig/xds.go
generated
vendored
8
vendor/google.golang.org/grpc/internal/envconfig/xds.go
generated
vendored
@@ -77,16 +77,16 @@ var (
|
||||
// environment variable
|
||||
// "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to
|
||||
// "true".
|
||||
XDSAggregateAndDNS = strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "true")
|
||||
XDSAggregateAndDNS = !strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "false")
|
||||
|
||||
// XDSRBAC indicates whether xDS configured RBAC HTTP Filter is enabled,
|
||||
// which can be disabled by setting the environment variable
|
||||
// "GRPC_XDS_EXPERIMENTAL_RBAC" to "false".
|
||||
XDSRBAC = !strings.EqualFold(os.Getenv(rbacSupportEnv), "false")
|
||||
// XDSOutlierDetection indicates whether outlier detection support is
|
||||
// enabled, which can be enabled by setting the environment variable
|
||||
// "GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION" to "true".
|
||||
XDSOutlierDetection = strings.EqualFold(os.Getenv(outlierDetectionSupportEnv), "true")
|
||||
// enabled, which can be disabled by setting the environment variable
|
||||
// "GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION" to "false".
|
||||
XDSOutlierDetection = !strings.EqualFold(os.Getenv(outlierDetectionSupportEnv), "false")
|
||||
// XDSFederation indicates whether federation support is enabled.
|
||||
XDSFederation = strings.EqualFold(os.Getenv(federationEnv), "true")
|
||||
|
||||
|
2
vendor/google.golang.org/grpc/internal/grpclog/grpclog.go
generated
vendored
2
vendor/google.golang.org/grpc/internal/grpclog/grpclog.go
generated
vendored
@@ -110,7 +110,7 @@ type LoggerV2 interface {
|
||||
// This is a copy of the DepthLoggerV2 defined in the external grpclog package.
|
||||
// It is defined here to avoid a circular dependency.
|
||||
//
|
||||
// Experimental
|
||||
// # Experimental
|
||||
//
|
||||
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
|
||||
// later release.
|
||||
|
7
vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go
generated
vendored
7
vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go
generated
vendored
@@ -52,6 +52,13 @@ func Intn(n int) int {
|
||||
return r.Intn(n)
|
||||
}
|
||||
|
||||
// Int31n implements rand.Int31n on the grpcrand global source.
|
||||
func Int31n(n int32) int32 {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
return r.Int31n(n)
|
||||
}
|
||||
|
||||
// Float64 implements rand.Float64 on the grpcrand global source.
|
||||
func Float64() float64 {
|
||||
mu.Lock()
|
||||
|
32
vendor/google.golang.org/grpc/internal/grpcsync/oncefunc.go
generated
vendored
Normal file
32
vendor/google.golang.org/grpc/internal/grpcsync/oncefunc.go
generated
vendored
Normal file
@@ -0,0 +1,32 @@
|
||||
/*
|
||||
*
|
||||
* Copyright 2022 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package grpcsync
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// OnceFunc returns a function wrapping f which ensures f is only executed
|
||||
// once even if the returned function is executed multiple times.
|
||||
func OnceFunc(f func()) func() {
|
||||
var once sync.Once
|
||||
return func() {
|
||||
once.Do(f)
|
||||
}
|
||||
}
|
47
vendor/google.golang.org/grpc/internal/grpcutil/compressor.go
generated
vendored
Normal file
47
vendor/google.golang.org/grpc/internal/grpcutil/compressor.go
generated
vendored
Normal file
@@ -0,0 +1,47 @@
|
||||
/*
|
||||
*
|
||||
* Copyright 2022 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package grpcutil
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"google.golang.org/grpc/internal/envconfig"
|
||||
)
|
||||
|
||||
// RegisteredCompressorNames holds names of the registered compressors.
|
||||
var RegisteredCompressorNames []string
|
||||
|
||||
// IsCompressorNameRegistered returns true when name is available in registry.
|
||||
func IsCompressorNameRegistered(name string) bool {
|
||||
for _, compressor := range RegisteredCompressorNames {
|
||||
if compressor == name {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// RegisteredCompressors returns a string of registered compressor names
|
||||
// separated by comma.
|
||||
func RegisteredCompressors() string {
|
||||
if !envconfig.AdvertiseCompressors {
|
||||
return ""
|
||||
}
|
||||
return strings.Join(RegisteredCompressorNames, ",")
|
||||
}
|
6
vendor/google.golang.org/grpc/internal/grpcutil/method.go
generated
vendored
6
vendor/google.golang.org/grpc/internal/grpcutil/method.go
generated
vendored
@@ -25,7 +25,6 @@ import (
|
||||
|
||||
// ParseMethod splits service and method from the input. It expects format
|
||||
// "/service/method".
|
||||
//
|
||||
func ParseMethod(methodName string) (service, method string, _ error) {
|
||||
if !strings.HasPrefix(methodName, "/") {
|
||||
return "", "", errors.New("invalid method name: should start with /")
|
||||
@@ -39,6 +38,11 @@ func ParseMethod(methodName string) (service, method string, _ error) {
|
||||
return methodName[:pos], methodName[pos+1:], nil
|
||||
}
|
||||
|
||||
// baseContentType is the base content-type for gRPC. This is a valid
|
||||
// content-type on it's own, but can also include a content-subtype such as
|
||||
// "proto" as a suffix after "+" or ";". See
|
||||
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
|
||||
// for more details.
|
||||
const baseContentType = "application/grpc"
|
||||
|
||||
// ContentSubtype returns the content-subtype for the given content-type. The
|
||||
|
64
vendor/google.golang.org/grpc/internal/internal.go
generated
vendored
64
vendor/google.golang.org/grpc/internal/internal.go
generated
vendored
@@ -63,6 +63,70 @@ var (
|
||||
// xDS-enabled server invokes this method on a grpc.Server when a particular
|
||||
// listener moves to "not-serving" mode.
|
||||
DrainServerTransports interface{} // func(*grpc.Server, string)
|
||||
// AddGlobalServerOptions adds an array of ServerOption that will be
|
||||
// effective globally for newly created servers. The priority will be: 1.
|
||||
// user-provided; 2. this method; 3. default values.
|
||||
AddGlobalServerOptions interface{} // func(opt ...ServerOption)
|
||||
// ClearGlobalServerOptions clears the array of extra ServerOption. This
|
||||
// method is useful in testing and benchmarking.
|
||||
ClearGlobalServerOptions func()
|
||||
// AddGlobalDialOptions adds an array of DialOption that will be effective
|
||||
// globally for newly created client channels. The priority will be: 1.
|
||||
// user-provided; 2. this method; 3. default values.
|
||||
AddGlobalDialOptions interface{} // func(opt ...DialOption)
|
||||
// ClearGlobalDialOptions clears the array of extra DialOption. This
|
||||
// method is useful in testing and benchmarking.
|
||||
ClearGlobalDialOptions func()
|
||||
// JoinServerOptions combines the server options passed as arguments into a
|
||||
// single server option.
|
||||
JoinServerOptions interface{} // func(...grpc.ServerOption) grpc.ServerOption
|
||||
|
||||
// WithBinaryLogger returns a DialOption that specifies the binary logger
|
||||
// for a ClientConn.
|
||||
WithBinaryLogger interface{} // func(binarylog.Logger) grpc.DialOption
|
||||
// BinaryLogger returns a ServerOption that can set the binary logger for a
|
||||
// server.
|
||||
BinaryLogger interface{} // func(binarylog.Logger) grpc.ServerOption
|
||||
|
||||
// NewXDSResolverWithConfigForTesting creates a new xds resolver builder using
|
||||
// the provided xds bootstrap config instead of the global configuration from
|
||||
// the supported environment variables. The resolver.Builder is meant to be
|
||||
// used in conjunction with the grpc.WithResolvers DialOption.
|
||||
//
|
||||
// Testing Only
|
||||
//
|
||||
// This function should ONLY be used for testing and may not work with some
|
||||
// other features, including the CSDS service.
|
||||
NewXDSResolverWithConfigForTesting interface{} // func([]byte) (resolver.Builder, error)
|
||||
|
||||
// RegisterRLSClusterSpecifierPluginForTesting registers the RLS Cluster
|
||||
// Specifier Plugin for testing purposes, regardless of the XDSRLS environment
|
||||
// variable.
|
||||
//
|
||||
// TODO: Remove this function once the RLS env var is removed.
|
||||
RegisterRLSClusterSpecifierPluginForTesting func()
|
||||
|
||||
// UnregisterRLSClusterSpecifierPluginForTesting unregisters the RLS Cluster
|
||||
// Specifier Plugin for testing purposes. This is needed because there is no way
|
||||
// to unregister the RLS Cluster Specifier Plugin after registering it solely
|
||||
// for testing purposes using RegisterRLSClusterSpecifierPluginForTesting().
|
||||
//
|
||||
// TODO: Remove this function once the RLS env var is removed.
|
||||
UnregisterRLSClusterSpecifierPluginForTesting func()
|
||||
|
||||
// RegisterRBACHTTPFilterForTesting registers the RBAC HTTP Filter for testing
|
||||
// purposes, regardless of the RBAC environment variable.
|
||||
//
|
||||
// TODO: Remove this function once the RBAC env var is removed.
|
||||
RegisterRBACHTTPFilterForTesting func()
|
||||
|
||||
// UnregisterRBACHTTPFilterForTesting unregisters the RBAC HTTP Filter for
|
||||
// testing purposes. This is needed because there is no way to unregister the
|
||||
// HTTP Filter after registering it solely for testing purposes using
|
||||
// RegisterRBACHTTPFilterForTesting().
|
||||
//
|
||||
// TODO: Remove this function once the RBAC env var is removed.
|
||||
UnregisterRBACHTTPFilterForTesting func()
|
||||
)
|
||||
|
||||
// HealthChecker defines the signature of the client-side LB channel health checking function.
|
||||
|
5
vendor/google.golang.org/grpc/internal/resolver/unix/unix.go
generated
vendored
5
vendor/google.golang.org/grpc/internal/resolver/unix/unix.go
generated
vendored
@@ -49,8 +49,9 @@ func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, _ resolv
|
||||
}
|
||||
addr := resolver.Address{Addr: endpoint}
|
||||
if b.scheme == unixAbstractScheme {
|
||||
// prepend "\x00" to address for unix-abstract
|
||||
addr.Addr = "\x00" + addr.Addr
|
||||
// We can not prepend \0 as c++ gRPC does, as in Golang '@' is used to signify we do
|
||||
// not want trailing \0 in address.
|
||||
addr.Addr = "@" + addr.Addr
|
||||
}
|
||||
cc.UpdateState(resolver.State{Addresses: []resolver.Address{networktype.Set(addr, "unix")}})
|
||||
return &nopResolver{}, nil
|
||||
|
8
vendor/google.golang.org/grpc/internal/serviceconfig/serviceconfig.go
generated
vendored
8
vendor/google.golang.org/grpc/internal/serviceconfig/serviceconfig.go
generated
vendored
@@ -67,10 +67,10 @@ func (bc *BalancerConfig) MarshalJSON() ([]byte, error) {
|
||||
// ServiceConfig contains a list of loadBalancingConfigs, each with a name and
|
||||
// config. This method iterates through that list in order, and stops at the
|
||||
// first policy that is supported.
|
||||
// - If the config for the first supported policy is invalid, the whole service
|
||||
// config is invalid.
|
||||
// - If the list doesn't contain any supported policy, the whole service config
|
||||
// is invalid.
|
||||
// - If the config for the first supported policy is invalid, the whole service
|
||||
// config is invalid.
|
||||
// - If the list doesn't contain any supported policy, the whole service config
|
||||
// is invalid.
|
||||
func (bc *BalancerConfig) UnmarshalJSON(b []byte) error {
|
||||
var ir intermediateBalancerConfig
|
||||
err := json.Unmarshal(b, &ir)
|
||||
|
10
vendor/google.golang.org/grpc/internal/status/status.go
generated
vendored
10
vendor/google.golang.org/grpc/internal/status/status.go
generated
vendored
@@ -164,3 +164,13 @@ func (e *Error) Is(target error) bool {
|
||||
}
|
||||
return proto.Equal(e.s.s, tse.s.s)
|
||||
}
|
||||
|
||||
// IsRestrictedControlPlaneCode returns whether the status includes a code
|
||||
// restricted for control plane usage as defined by gRFC A54.
|
||||
func IsRestrictedControlPlaneCode(s *Status) bool {
|
||||
switch s.Code() {
|
||||
case codes.InvalidArgument, codes.NotFound, codes.AlreadyExists, codes.FailedPrecondition, codes.Aborted, codes.OutOfRange, codes.DataLoss:
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
4
vendor/google.golang.org/grpc/internal/transport/controlbuf.go
generated
vendored
4
vendor/google.golang.org/grpc/internal/transport/controlbuf.go
generated
vendored
@@ -886,9 +886,9 @@ func (l *loopyWriter) processData() (bool, error) {
|
||||
dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
|
||||
// A data item is represented by a dataFrame, since it later translates into
|
||||
// multiple HTTP2 data frames.
|
||||
// Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data.
|
||||
// Every dataFrame has two buffers; h that keeps grpc-message header and d that is actual data.
|
||||
// As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
|
||||
// maximum possilbe HTTP2 frame size.
|
||||
// maximum possible HTTP2 frame size.
|
||||
|
||||
if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
|
||||
// Client sends out empty data frame with endStream = true
|
||||
|
30
vendor/google.golang.org/grpc/internal/transport/handler_server.go
generated
vendored
30
vendor/google.golang.org/grpc/internal/transport/handler_server.go
generated
vendored
@@ -49,7 +49,7 @@ import (
|
||||
// NewServerHandlerTransport returns a ServerTransport handling gRPC
|
||||
// from inside an http.Handler. It requires that the http Server
|
||||
// supports HTTP/2.
|
||||
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats stats.Handler) (ServerTransport, error) {
|
||||
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler) (ServerTransport, error) {
|
||||
if r.ProtoMajor != 2 {
|
||||
return nil, errors.New("gRPC requires HTTP/2")
|
||||
}
|
||||
@@ -138,7 +138,7 @@ type serverHandlerTransport struct {
|
||||
// TODO make sure this is consistent across handler_server and http2_server
|
||||
contentSubtype string
|
||||
|
||||
stats stats.Handler
|
||||
stats []stats.Handler
|
||||
}
|
||||
|
||||
func (ht *serverHandlerTransport) Close() {
|
||||
@@ -228,10 +228,10 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
|
||||
})
|
||||
|
||||
if err == nil { // transport has not been closed
|
||||
if ht.stats != nil {
|
||||
// Note: The trailer fields are compressed with hpack after this call returns.
|
||||
// No WireLength field is set here.
|
||||
ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{
|
||||
// Note: The trailer fields are compressed with hpack after this call returns.
|
||||
// No WireLength field is set here.
|
||||
for _, sh := range ht.stats {
|
||||
sh.HandleRPC(s.Context(), &stats.OutTrailer{
|
||||
Trailer: s.trailer.Copy(),
|
||||
})
|
||||
}
|
||||
@@ -314,10 +314,10 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
})
|
||||
|
||||
if err == nil {
|
||||
if ht.stats != nil {
|
||||
for _, sh := range ht.stats {
|
||||
// Note: The header fields are compressed with hpack after this call returns.
|
||||
// No WireLength field is set here.
|
||||
ht.stats.HandleRPC(s.Context(), &stats.OutHeader{
|
||||
sh.HandleRPC(s.Context(), &stats.OutHeader{
|
||||
Header: md.Copy(),
|
||||
Compression: s.sendCompress,
|
||||
})
|
||||
@@ -369,14 +369,14 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
|
||||
}
|
||||
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
|
||||
s.ctx = peer.NewContext(ctx, pr)
|
||||
if ht.stats != nil {
|
||||
s.ctx = ht.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
|
||||
for _, sh := range ht.stats {
|
||||
s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
|
||||
inHeader := &stats.InHeader{
|
||||
FullMethod: s.method,
|
||||
RemoteAddr: ht.RemoteAddr(),
|
||||
Compression: s.recvCompress,
|
||||
}
|
||||
ht.stats.HandleRPC(s.ctx, inHeader)
|
||||
sh.HandleRPC(s.ctx, inHeader)
|
||||
}
|
||||
s.trReader = &transportReader{
|
||||
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},
|
||||
@@ -442,10 +442,10 @@ func (ht *serverHandlerTransport) Drain() {
|
||||
// mapRecvMsgError returns the non-nil err into the appropriate
|
||||
// error value as expected by callers of *grpc.parser.recvMsg.
|
||||
// In particular, in can only be:
|
||||
// * io.EOF
|
||||
// * io.ErrUnexpectedEOF
|
||||
// * of type transport.ConnectionError
|
||||
// * an error from the status package
|
||||
// - io.EOF
|
||||
// - io.ErrUnexpectedEOF
|
||||
// - of type transport.ConnectionError
|
||||
// - an error from the status package
|
||||
func mapRecvMsgError(err error) error {
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
return err
|
||||
|
269
vendor/google.golang.org/grpc/internal/transport/http2_client.go
generated
vendored
269
vendor/google.golang.org/grpc/internal/transport/http2_client.go
generated
vendored
@@ -38,8 +38,10 @@ import (
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
icredentials "google.golang.org/grpc/internal/credentials"
|
||||
"google.golang.org/grpc/internal/grpcsync"
|
||||
"google.golang.org/grpc/internal/grpcutil"
|
||||
imetadata "google.golang.org/grpc/internal/metadata"
|
||||
istatus "google.golang.org/grpc/internal/status"
|
||||
"google.golang.org/grpc/internal/syscall"
|
||||
"google.golang.org/grpc/internal/transport/networktype"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
@@ -78,6 +80,7 @@ type http2Client struct {
|
||||
framer *framer
|
||||
// controlBuf delivers all the control related tasks (e.g., window
|
||||
// updates, reset streams, and various settings) to the controller.
|
||||
// Do not access controlBuf with mu held.
|
||||
controlBuf *controlBuffer
|
||||
fc *trInFlow
|
||||
// The scheme used: https if TLS is on, http otherwise.
|
||||
@@ -90,7 +93,7 @@ type http2Client struct {
|
||||
kp keepalive.ClientParameters
|
||||
keepaliveEnabled bool
|
||||
|
||||
statsHandler stats.Handler
|
||||
statsHandlers []stats.Handler
|
||||
|
||||
initialWindowSize int32
|
||||
|
||||
@@ -98,17 +101,15 @@ type http2Client struct {
|
||||
maxSendHeaderListSize *uint32
|
||||
|
||||
bdpEst *bdpEstimator
|
||||
// onPrefaceReceipt is a callback that client transport calls upon
|
||||
// receiving server preface to signal that a succefull HTTP2
|
||||
// connection was established.
|
||||
onPrefaceReceipt func()
|
||||
|
||||
maxConcurrentStreams uint32
|
||||
streamQuota int64
|
||||
streamsQuotaAvailable chan struct{}
|
||||
waitingStreams uint32
|
||||
nextID uint32
|
||||
registeredCompressors string
|
||||
|
||||
// Do not access controlBuf with mu held.
|
||||
mu sync.Mutex // guard the following variables
|
||||
state transportState
|
||||
activeStreams map[uint32]*Stream
|
||||
@@ -192,7 +193,7 @@ func isTemporary(err error) bool {
|
||||
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
|
||||
// and starts to receive messages on it. Non-nil error returns if construction
|
||||
// fails.
|
||||
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
|
||||
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
|
||||
scheme := "http"
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer func() {
|
||||
@@ -214,12 +215,35 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
}
|
||||
return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
|
||||
}
|
||||
|
||||
// Any further errors will close the underlying connection
|
||||
defer func(conn net.Conn) {
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
}
|
||||
}(conn)
|
||||
|
||||
// The following defer and goroutine monitor the connectCtx for cancelation
|
||||
// and deadline. On context expiration, the connection is hard closed and
|
||||
// this function will naturally fail as a result. Otherwise, the defer
|
||||
// waits for the goroutine to exit to prevent the context from being
|
||||
// monitored (and to prevent the connection from ever being closed) after
|
||||
// returning from this function.
|
||||
ctxMonitorDone := grpcsync.NewEvent()
|
||||
newClientCtx, newClientDone := context.WithCancel(connectCtx)
|
||||
defer func() {
|
||||
newClientDone() // Awaken the goroutine below if connectCtx hasn't expired.
|
||||
<-ctxMonitorDone.Done() // Wait for the goroutine below to exit.
|
||||
}()
|
||||
go func(conn net.Conn) {
|
||||
defer ctxMonitorDone.Fire() // Signal this goroutine has exited.
|
||||
<-newClientCtx.Done() // Block until connectCtx expires or the defer above executes.
|
||||
if connectCtx.Err() != nil {
|
||||
// connectCtx expired before exiting the function. Hard close the connection.
|
||||
conn.Close()
|
||||
}
|
||||
}(conn)
|
||||
|
||||
kp := opts.KeepaliveParams
|
||||
// Validate keepalive parameters.
|
||||
if kp.Time == 0 {
|
||||
@@ -251,15 +275,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
}
|
||||
}
|
||||
if transportCreds != nil {
|
||||
rawConn := conn
|
||||
// Pull the deadline from the connectCtx, which will be used for
|
||||
// timeouts in the authentication protocol handshake. Can ignore the
|
||||
// boolean as the deadline will return the zero value, which will make
|
||||
// the conn not timeout on I/O operations.
|
||||
deadline, _ := connectCtx.Deadline()
|
||||
rawConn.SetDeadline(deadline)
|
||||
conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, rawConn)
|
||||
rawConn.SetDeadline(time.Time{})
|
||||
conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, conn)
|
||||
if err != nil {
|
||||
return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
|
||||
}
|
||||
@@ -297,6 +313,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
ctxDone: ctx.Done(), // Cache Done chan.
|
||||
cancel: cancel,
|
||||
userAgent: opts.UserAgent,
|
||||
registeredCompressors: grpcutil.RegisteredCompressors(),
|
||||
conn: conn,
|
||||
remoteAddr: conn.RemoteAddr(),
|
||||
localAddr: conn.LocalAddr(),
|
||||
@@ -311,19 +328,20 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
isSecure: isSecure,
|
||||
perRPCCreds: perRPCCreds,
|
||||
kp: kp,
|
||||
statsHandler: opts.StatsHandler,
|
||||
statsHandlers: opts.StatsHandlers,
|
||||
initialWindowSize: initialWindowSize,
|
||||
onPrefaceReceipt: onPrefaceReceipt,
|
||||
nextID: 1,
|
||||
maxConcurrentStreams: defaultMaxStreamsClient,
|
||||
streamQuota: defaultMaxStreamsClient,
|
||||
streamsQuotaAvailable: make(chan struct{}, 1),
|
||||
czData: new(channelzData),
|
||||
onGoAway: onGoAway,
|
||||
onClose: onClose,
|
||||
keepaliveEnabled: keepaliveEnabled,
|
||||
bufferPool: newBufferPool(),
|
||||
onClose: onClose,
|
||||
}
|
||||
// Add peer information to the http2client context.
|
||||
t.ctx = peer.NewContext(t.ctx, t.getPeer())
|
||||
|
||||
if md, ok := addr.Metadata.(*metadata.MD); ok {
|
||||
t.md = *md
|
||||
@@ -341,15 +359,15 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
updateFlowControl: t.updateFlowControl,
|
||||
}
|
||||
}
|
||||
if t.statsHandler != nil {
|
||||
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
|
||||
for _, sh := range t.statsHandlers {
|
||||
t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
|
||||
RemoteAddr: t.remoteAddr,
|
||||
LocalAddr: t.localAddr,
|
||||
})
|
||||
connBegin := &stats.ConnBegin{
|
||||
Client: true,
|
||||
}
|
||||
t.statsHandler.HandleConn(t.ctx, connBegin)
|
||||
sh.HandleConn(t.ctx, connBegin)
|
||||
}
|
||||
t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
|
||||
if err != nil {
|
||||
@@ -359,21 +377,32 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
t.kpDormancyCond = sync.NewCond(&t.mu)
|
||||
go t.keepalive()
|
||||
}
|
||||
// Start the reader goroutine for incoming message. Each transport has
|
||||
// a dedicated goroutine which reads HTTP2 frame from network. Then it
|
||||
// dispatches the frame to the corresponding stream entity.
|
||||
go t.reader()
|
||||
|
||||
// Start the reader goroutine for incoming messages. Each transport has a
|
||||
// dedicated goroutine which reads HTTP2 frames from the network. Then it
|
||||
// dispatches the frame to the corresponding stream entity. When the
|
||||
// server preface is received, readerErrCh is closed. If an error occurs
|
||||
// first, an error is pushed to the channel. This must be checked before
|
||||
// returning from this function.
|
||||
readerErrCh := make(chan error, 1)
|
||||
go t.reader(readerErrCh)
|
||||
defer func() {
|
||||
if err == nil {
|
||||
err = <-readerErrCh
|
||||
}
|
||||
if err != nil {
|
||||
t.Close(err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Send connection preface to server.
|
||||
n, err := t.conn.Write(clientPreface)
|
||||
if err != nil {
|
||||
err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
|
||||
t.Close(err)
|
||||
return nil, err
|
||||
}
|
||||
if n != len(clientPreface) {
|
||||
err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
|
||||
t.Close(err)
|
||||
return nil, err
|
||||
}
|
||||
var ss []http2.Setting
|
||||
@@ -393,14 +422,12 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
err = t.framer.fr.WriteSettings(ss...)
|
||||
if err != nil {
|
||||
err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
|
||||
t.Close(err)
|
||||
return nil, err
|
||||
}
|
||||
// Adjust the connection flow control window if needed.
|
||||
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
|
||||
if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
|
||||
err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
|
||||
t.Close(err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
@@ -467,7 +494,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
|
||||
func (t *http2Client) getPeer() *peer.Peer {
|
||||
return &peer.Peer{
|
||||
Addr: t.remoteAddr,
|
||||
AuthInfo: t.authInfo,
|
||||
AuthInfo: t.authInfo, // Can be nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -503,9 +530,22 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
|
||||
}
|
||||
|
||||
registeredCompressors := t.registeredCompressors
|
||||
if callHdr.SendCompress != "" {
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: callHdr.SendCompress})
|
||||
// Include the outgoing compressor name when compressor is not registered
|
||||
// via encoding.RegisterCompressor. This is possible when client uses
|
||||
// WithCompressor dial option.
|
||||
if !grpcutil.IsCompressorNameRegistered(callHdr.SendCompress) {
|
||||
if registeredCompressors != "" {
|
||||
registeredCompressors += ","
|
||||
}
|
||||
registeredCompressors += callHdr.SendCompress
|
||||
}
|
||||
}
|
||||
|
||||
if registeredCompressors != "" {
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: registeredCompressors})
|
||||
}
|
||||
if dl, ok := ctx.Deadline(); ok {
|
||||
// Send out timeout regardless its value. The server can detect timeout context by itself.
|
||||
@@ -585,7 +625,11 @@ func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[s
|
||||
for _, c := range t.perRPCCreds {
|
||||
data, err := c.GetRequestMetadata(ctx, audience)
|
||||
if err != nil {
|
||||
if _, ok := status.FromError(err); ok {
|
||||
if st, ok := status.FromError(err); ok {
|
||||
// Restrict the code to the list allowed by gRFC A54.
|
||||
if istatus.IsRestrictedControlPlaneCode(st) {
|
||||
err = status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", err)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -614,7 +658,14 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
|
||||
}
|
||||
data, err := callCreds.GetRequestMetadata(ctx, audience)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "transport: %v", err)
|
||||
if st, ok := status.FromError(err); ok {
|
||||
// Restrict the code to the list allowed by gRFC A54.
|
||||
if istatus.IsRestrictedControlPlaneCode(st) {
|
||||
err = status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", err)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return nil, status.Errorf(codes.Internal, "transport: per-RPC creds failed due to error: %v", err)
|
||||
}
|
||||
callAuthData = make(map[string]string, len(data))
|
||||
for k, v := range data {
|
||||
@@ -630,13 +681,13 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
|
||||
// NewStream errors result in transparent retry, as they mean nothing went onto
|
||||
// the wire. However, there are two notable exceptions:
|
||||
//
|
||||
// 1. If the stream headers violate the max header list size allowed by the
|
||||
// server. It's possible this could succeed on another transport, even if
|
||||
// it's unlikely, but do not transparently retry.
|
||||
// 2. If the credentials errored when requesting their headers. In this case,
|
||||
// it's possible a retry can fix the problem, but indefinitely transparently
|
||||
// retrying is not appropriate as it is likely the credentials, if they can
|
||||
// eventually succeed, would need I/O to do so.
|
||||
// 1. If the stream headers violate the max header list size allowed by the
|
||||
// server. It's possible this could succeed on another transport, even if
|
||||
// it's unlikely, but do not transparently retry.
|
||||
// 2. If the credentials errored when requesting their headers. In this case,
|
||||
// it's possible a retry can fix the problem, but indefinitely transparently
|
||||
// retrying is not appropriate as it is likely the credentials, if they can
|
||||
// eventually succeed, would need I/O to do so.
|
||||
type NewStreamError struct {
|
||||
Err error
|
||||
|
||||
@@ -685,7 +736,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
|
||||
cleanup(err)
|
||||
return err
|
||||
}
|
||||
t.activeStreams[id] = s
|
||||
if channelz.IsOn() {
|
||||
atomic.AddInt64(&t.czData.streamsStarted, 1)
|
||||
atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
|
||||
@@ -719,6 +769,13 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
|
||||
t.nextID += 2
|
||||
s.id = h.streamID
|
||||
s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
|
||||
t.mu.Lock()
|
||||
if t.activeStreams == nil { // Can be niled from Close().
|
||||
t.mu.Unlock()
|
||||
return false // Don't create a stream if the transport is already closed.
|
||||
}
|
||||
t.activeStreams[s.id] = s
|
||||
t.mu.Unlock()
|
||||
if t.streamQuota > 0 && t.waitingStreams > 0 {
|
||||
select {
|
||||
case t.streamsQuotaAvailable <- struct{}{}:
|
||||
@@ -744,13 +801,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
|
||||
}
|
||||
for {
|
||||
success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
|
||||
if !checkForStreamQuota(it) {
|
||||
return false
|
||||
}
|
||||
if !checkForHeaderListSize(it) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
return checkForHeaderListSize(it) && checkForStreamQuota(it)
|
||||
}, hdr)
|
||||
if err != nil {
|
||||
// Connection closed.
|
||||
@@ -773,24 +824,27 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
|
||||
return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
|
||||
}
|
||||
}
|
||||
if t.statsHandler != nil {
|
||||
if len(t.statsHandlers) != 0 {
|
||||
header, ok := metadata.FromOutgoingContext(ctx)
|
||||
if ok {
|
||||
header.Set("user-agent", t.userAgent)
|
||||
} else {
|
||||
header = metadata.Pairs("user-agent", t.userAgent)
|
||||
}
|
||||
// Note: The header fields are compressed with hpack after this call returns.
|
||||
// No WireLength field is set here.
|
||||
outHeader := &stats.OutHeader{
|
||||
Client: true,
|
||||
FullMethod: callHdr.Method,
|
||||
RemoteAddr: t.remoteAddr,
|
||||
LocalAddr: t.localAddr,
|
||||
Compression: callHdr.SendCompress,
|
||||
Header: header,
|
||||
for _, sh := range t.statsHandlers {
|
||||
// Note: The header fields are compressed with hpack after this call returns.
|
||||
// No WireLength field is set here.
|
||||
// Note: Creating a new stats object to prevent pollution.
|
||||
outHeader := &stats.OutHeader{
|
||||
Client: true,
|
||||
FullMethod: callHdr.Method,
|
||||
RemoteAddr: t.remoteAddr,
|
||||
LocalAddr: t.localAddr,
|
||||
Compression: callHdr.SendCompress,
|
||||
Header: header,
|
||||
}
|
||||
sh.HandleRPC(s.ctx, outHeader)
|
||||
}
|
||||
t.statsHandler.HandleRPC(s.ctx, outHeader)
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
@@ -873,19 +927,15 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
|
||||
// Close kicks off the shutdown process of the transport. This should be called
|
||||
// only once on a transport. Once it is called, the transport should not be
|
||||
// accessed any more.
|
||||
//
|
||||
// This method blocks until the addrConn that initiated this transport is
|
||||
// re-connected. This happens because t.onClose() begins reconnect logic at the
|
||||
// addrConn level and blocks until the addrConn is successfully connected.
|
||||
func (t *http2Client) Close(err error) {
|
||||
t.mu.Lock()
|
||||
// Make sure we only Close once.
|
||||
// Make sure we only close once.
|
||||
if t.state == closing {
|
||||
t.mu.Unlock()
|
||||
return
|
||||
}
|
||||
// Call t.onClose before setting the state to closing to prevent the client
|
||||
// from attempting to create new streams ASAP.
|
||||
// Call t.onClose ASAP to prevent the client from attempting to create new
|
||||
// streams.
|
||||
t.onClose()
|
||||
t.state = closing
|
||||
streams := t.activeStreams
|
||||
@@ -916,11 +966,11 @@ func (t *http2Client) Close(err error) {
|
||||
for _, s := range streams {
|
||||
t.closeStream(s, err, false, http2.ErrCodeNo, st, nil, false)
|
||||
}
|
||||
if t.statsHandler != nil {
|
||||
for _, sh := range t.statsHandlers {
|
||||
connEnd := &stats.ConnEnd{
|
||||
Client: true,
|
||||
}
|
||||
t.statsHandler.HandleConn(t.ctx, connEnd)
|
||||
sh.HandleConn(t.ctx, connEnd)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1000,13 +1050,13 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
|
||||
// for the transport and the stream based on the current bdp
|
||||
// estimation.
|
||||
func (t *http2Client) updateFlowControl(n uint32) {
|
||||
t.mu.Lock()
|
||||
for _, s := range t.activeStreams {
|
||||
s.fc.newLimit(n)
|
||||
}
|
||||
t.mu.Unlock()
|
||||
updateIWS := func(interface{}) bool {
|
||||
t.initialWindowSize = int32(n)
|
||||
t.mu.Lock()
|
||||
for _, s := range t.activeStreams {
|
||||
s.fc.newLimit(n)
|
||||
}
|
||||
t.mu.Unlock()
|
||||
return true
|
||||
}
|
||||
t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
|
||||
@@ -1212,7 +1262,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
||||
default:
|
||||
t.setGoAwayReason(f)
|
||||
close(t.goAway)
|
||||
t.controlBuf.put(&incomingGoAway{})
|
||||
defer t.controlBuf.put(&incomingGoAway{}) // Defer as t.mu is currently held.
|
||||
// Notify the clientconn about the GOAWAY before we set the state to
|
||||
// draining, to allow the client to stop attempting to create streams
|
||||
// before disallowing new streams on this connection.
|
||||
@@ -1225,18 +1275,29 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
||||
if upperLimit == 0 { // This is the first GoAway Frame.
|
||||
upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
|
||||
}
|
||||
|
||||
t.prevGoAwayID = id
|
||||
if len(t.activeStreams) == 0 {
|
||||
t.mu.Unlock()
|
||||
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
|
||||
return
|
||||
}
|
||||
|
||||
streamsToClose := make([]*Stream, 0)
|
||||
for streamID, stream := range t.activeStreams {
|
||||
if streamID > id && streamID <= upperLimit {
|
||||
// The stream was unprocessed by the server.
|
||||
atomic.StoreUint32(&stream.unprocessed, 1)
|
||||
t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
|
||||
if streamID > id && streamID <= upperLimit {
|
||||
atomic.StoreUint32(&stream.unprocessed, 1)
|
||||
streamsToClose = append(streamsToClose, stream)
|
||||
}
|
||||
}
|
||||
}
|
||||
t.prevGoAwayID = id
|
||||
active := len(t.activeStreams)
|
||||
t.mu.Unlock()
|
||||
if active == 0 {
|
||||
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
|
||||
// Called outside t.mu because closeStream can take controlBuf's mu, which
|
||||
// could induce deadlock and is not allowed.
|
||||
for _, stream := range streamsToClose {
|
||||
t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1432,7 +1493,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
|
||||
close(s.headerChan)
|
||||
}
|
||||
|
||||
if t.statsHandler != nil {
|
||||
for _, sh := range t.statsHandlers {
|
||||
if isHeader {
|
||||
inHeader := &stats.InHeader{
|
||||
Client: true,
|
||||
@@ -1440,14 +1501,14 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
|
||||
Header: metadata.MD(mdata).Copy(),
|
||||
Compression: s.recvCompress,
|
||||
}
|
||||
t.statsHandler.HandleRPC(s.ctx, inHeader)
|
||||
sh.HandleRPC(s.ctx, inHeader)
|
||||
} else {
|
||||
inTrailer := &stats.InTrailer{
|
||||
Client: true,
|
||||
WireLength: int(frame.Header().Length),
|
||||
Trailer: metadata.MD(mdata).Copy(),
|
||||
}
|
||||
t.statsHandler.HandleRPC(s.ctx, inTrailer)
|
||||
sh.HandleRPC(s.ctx, inTrailer)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1464,33 +1525,35 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
|
||||
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true)
|
||||
}
|
||||
|
||||
// reader runs as a separate goroutine in charge of reading data from network
|
||||
// connection.
|
||||
//
|
||||
// TODO(zhaoq): currently one reader per transport. Investigate whether this is
|
||||
// optimal.
|
||||
// TODO(zhaoq): Check the validity of the incoming frame sequence.
|
||||
func (t *http2Client) reader() {
|
||||
defer close(t.readerDone)
|
||||
// Check the validity of server preface.
|
||||
// readServerPreface reads and handles the initial settings frame from the
|
||||
// server.
|
||||
func (t *http2Client) readServerPreface() error {
|
||||
frame, err := t.framer.fr.ReadFrame()
|
||||
if err != nil {
|
||||
err = connectionErrorf(true, err, "error reading server preface: %v", err)
|
||||
t.Close(err) // this kicks off resetTransport, so must be last before return
|
||||
return
|
||||
}
|
||||
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
|
||||
if t.keepaliveEnabled {
|
||||
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
|
||||
return connectionErrorf(true, err, "error reading server preface: %v", err)
|
||||
}
|
||||
sf, ok := frame.(*http2.SettingsFrame)
|
||||
if !ok {
|
||||
// this kicks off resetTransport, so must be last before return
|
||||
t.Close(connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame: %T", frame))
|
||||
return connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame: %T", frame)
|
||||
}
|
||||
t.handleSettings(sf, true)
|
||||
return nil
|
||||
}
|
||||
|
||||
// reader verifies the server preface and reads all subsequent data from
|
||||
// network connection. If the server preface is not read successfully, an
|
||||
// error is pushed to errCh; otherwise errCh is closed with no error.
|
||||
func (t *http2Client) reader(errCh chan<- error) {
|
||||
defer close(t.readerDone)
|
||||
|
||||
if err := t.readServerPreface(); err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
t.onPrefaceReceipt()
|
||||
t.handleSettings(sf, true)
|
||||
close(errCh)
|
||||
if t.keepaliveEnabled {
|
||||
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
|
||||
}
|
||||
|
||||
// loop to keep reading incoming messages on this transport.
|
||||
for {
|
||||
|
68
vendor/google.golang.org/grpc/internal/transport/http2_server.go
generated
vendored
68
vendor/google.golang.org/grpc/internal/transport/http2_server.go
generated
vendored
@@ -82,7 +82,7 @@ type http2Server struct {
|
||||
// updates, reset streams, and various settings) to the controller.
|
||||
controlBuf *controlBuffer
|
||||
fc *trInFlow
|
||||
stats stats.Handler
|
||||
stats []stats.Handler
|
||||
// Keepalive and max-age parameters for the server.
|
||||
kp keepalive.ServerParameters
|
||||
// Keepalive enforcement policy.
|
||||
@@ -257,7 +257,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
|
||||
fc: &trInFlow{limit: uint32(icwz)},
|
||||
state: reachable,
|
||||
activeStreams: make(map[uint32]*Stream),
|
||||
stats: config.StatsHandler,
|
||||
stats: config.StatsHandlers,
|
||||
kp: kp,
|
||||
idle: time.Now(),
|
||||
kep: kep,
|
||||
@@ -265,6 +265,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
|
||||
czData: new(channelzData),
|
||||
bufferPool: newBufferPool(),
|
||||
}
|
||||
// Add peer information to the http2server context.
|
||||
t.ctx = peer.NewContext(t.ctx, t.getPeer())
|
||||
|
||||
t.controlBuf = newControlBuffer(t.done)
|
||||
if dynamicWindow {
|
||||
t.bdpEst = &bdpEstimator{
|
||||
@@ -272,13 +275,13 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
|
||||
updateFlowControl: t.updateFlowControl,
|
||||
}
|
||||
}
|
||||
if t.stats != nil {
|
||||
t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
|
||||
for _, sh := range t.stats {
|
||||
t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
|
||||
RemoteAddr: t.remoteAddr,
|
||||
LocalAddr: t.localAddr,
|
||||
})
|
||||
connBegin := &stats.ConnBegin{}
|
||||
t.stats.HandleConn(t.ctx, connBegin)
|
||||
sh.HandleConn(t.ctx, connBegin)
|
||||
}
|
||||
t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
|
||||
if err != nil {
|
||||
@@ -485,14 +488,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
} else {
|
||||
s.ctx, s.cancel = context.WithCancel(t.ctx)
|
||||
}
|
||||
pr := &peer.Peer{
|
||||
Addr: t.remoteAddr,
|
||||
}
|
||||
// Attach Auth info if there is any.
|
||||
if t.authInfo != nil {
|
||||
pr.AuthInfo = t.authInfo
|
||||
}
|
||||
s.ctx = peer.NewContext(s.ctx, pr)
|
||||
|
||||
// Attach the received metadata to the context.
|
||||
if len(mdata) > 0 {
|
||||
s.ctx = metadata.NewIncomingContext(s.ctx, mdata)
|
||||
@@ -570,8 +566,8 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
t.adjustWindow(s, uint32(n))
|
||||
}
|
||||
s.ctx = traceCtx(s.ctx, s.method)
|
||||
if t.stats != nil {
|
||||
s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
|
||||
for _, sh := range t.stats {
|
||||
s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
|
||||
inHeader := &stats.InHeader{
|
||||
FullMethod: s.method,
|
||||
RemoteAddr: t.remoteAddr,
|
||||
@@ -580,7 +576,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
WireLength: int(frame.Header().Length),
|
||||
Header: metadata.MD(mdata).Copy(),
|
||||
}
|
||||
t.stats.HandleRPC(s.ctx, inHeader)
|
||||
sh.HandleRPC(s.ctx, inHeader)
|
||||
}
|
||||
s.ctxDone = s.ctx.Done()
|
||||
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
|
||||
@@ -945,15 +941,16 @@ func (t *http2Server) streamContextErr(s *Stream) error {
|
||||
|
||||
// WriteHeader sends the header metadata md back to the client.
|
||||
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
if s.updateHeaderSent() {
|
||||
return ErrIllegalHeaderWrite
|
||||
}
|
||||
|
||||
s.hdrMu.Lock()
|
||||
defer s.hdrMu.Unlock()
|
||||
if s.getState() == streamDone {
|
||||
return t.streamContextErr(s)
|
||||
}
|
||||
|
||||
s.hdrMu.Lock()
|
||||
if s.updateHeaderSent() {
|
||||
return ErrIllegalHeaderWrite
|
||||
}
|
||||
|
||||
if md.Len() > 0 {
|
||||
if s.header.Len() > 0 {
|
||||
s.header = metadata.Join(s.header, md)
|
||||
@@ -962,10 +959,8 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
}
|
||||
}
|
||||
if err := t.writeHeaderLocked(s); err != nil {
|
||||
s.hdrMu.Unlock()
|
||||
return status.Convert(err).Err()
|
||||
}
|
||||
s.hdrMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -996,14 +991,14 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {
|
||||
t.closeStream(s, true, http2.ErrCodeInternal, false)
|
||||
return ErrHeaderListSizeLimitViolation
|
||||
}
|
||||
if t.stats != nil {
|
||||
for _, sh := range t.stats {
|
||||
// Note: Headers are compressed with hpack after this call returns.
|
||||
// No WireLength field is set here.
|
||||
outHeader := &stats.OutHeader{
|
||||
Header: s.header.Copy(),
|
||||
Compression: s.sendCompress,
|
||||
}
|
||||
t.stats.HandleRPC(s.Context(), outHeader)
|
||||
sh.HandleRPC(s.Context(), outHeader)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -1013,17 +1008,19 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {
|
||||
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
|
||||
// OK is adopted.
|
||||
func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
||||
s.hdrMu.Lock()
|
||||
defer s.hdrMu.Unlock()
|
||||
|
||||
if s.getState() == streamDone {
|
||||
return nil
|
||||
}
|
||||
s.hdrMu.Lock()
|
||||
|
||||
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
|
||||
// first and create a slice of that exact size.
|
||||
headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
|
||||
if !s.updateHeaderSent() { // No headers have been sent.
|
||||
if len(s.header) > 0 { // Send a separate header frame.
|
||||
if err := t.writeHeaderLocked(s); err != nil {
|
||||
s.hdrMu.Unlock()
|
||||
return err
|
||||
}
|
||||
} else { // Send a trailer only response.
|
||||
@@ -1052,7 +1049,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
||||
endStream: true,
|
||||
onWrite: t.setResetPingStrikes,
|
||||
}
|
||||
s.hdrMu.Unlock()
|
||||
|
||||
success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
|
||||
if !success {
|
||||
if err != nil {
|
||||
@@ -1064,10 +1061,10 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
||||
// Send a RST_STREAM after the trailers if the client has not already half-closed.
|
||||
rst := s.getState() == streamActive
|
||||
t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
|
||||
if t.stats != nil {
|
||||
for _, sh := range t.stats {
|
||||
// Note: The trailer fields are compressed with hpack after this call returns.
|
||||
// No WireLength field is set here.
|
||||
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
|
||||
sh.HandleRPC(s.Context(), &stats.OutTrailer{
|
||||
Trailer: s.trailer.Copy(),
|
||||
})
|
||||
}
|
||||
@@ -1222,9 +1219,9 @@ func (t *http2Server) Close() {
|
||||
for _, s := range streams {
|
||||
s.cancel()
|
||||
}
|
||||
if t.stats != nil {
|
||||
for _, sh := range t.stats {
|
||||
connEnd := &stats.ConnEnd{}
|
||||
t.stats.HandleConn(t.ctx, connEnd)
|
||||
sh.HandleConn(t.ctx, connEnd)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1415,6 +1412,13 @@ func (t *http2Server) getOutFlowWindow() int64 {
|
||||
}
|
||||
}
|
||||
|
||||
func (t *http2Server) getPeer() *peer.Peer {
|
||||
return &peer.Peer{
|
||||
Addr: t.remoteAddr,
|
||||
AuthInfo: t.authInfo, // Can be nil
|
||||
}
|
||||
}
|
||||
|
||||
func getJitter(v time.Duration) time.Duration {
|
||||
if v == infinity {
|
||||
return 0
|
||||
|
34
vendor/google.golang.org/grpc/internal/transport/http_util.go
generated
vendored
34
vendor/google.golang.org/grpc/internal/transport/http_util.go
generated
vendored
@@ -20,7 +20,6 @@ package transport
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -45,14 +44,8 @@ import (
|
||||
const (
|
||||
// http2MaxFrameLen specifies the max length of a HTTP2 frame.
|
||||
http2MaxFrameLen = 16384 // 16KB frame
|
||||
// http://http2.github.io/http2-spec/#SettingValues
|
||||
// https://httpwg.org/specs/rfc7540.html#SettingValues
|
||||
http2InitHeaderTableSize = 4096
|
||||
// baseContentType is the base content-type for gRPC. This is a valid
|
||||
// content-type on it's own, but can also include a content-subtype such as
|
||||
// "proto" as a suffix after "+" or ";". See
|
||||
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
|
||||
// for more details.
|
||||
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -257,13 +250,13 @@ func encodeGrpcMessage(msg string) string {
|
||||
}
|
||||
|
||||
func encodeGrpcMessageUnchecked(msg string) string {
|
||||
var buf bytes.Buffer
|
||||
var sb strings.Builder
|
||||
for len(msg) > 0 {
|
||||
r, size := utf8.DecodeRuneInString(msg)
|
||||
for _, b := range []byte(string(r)) {
|
||||
if size > 1 {
|
||||
// If size > 1, r is not ascii. Always do percent encoding.
|
||||
buf.WriteString(fmt.Sprintf("%%%02X", b))
|
||||
fmt.Fprintf(&sb, "%%%02X", b)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -272,14 +265,14 @@ func encodeGrpcMessageUnchecked(msg string) string {
|
||||
//
|
||||
// fmt.Sprintf("%%%02X", utf8.RuneError) gives "%FFFD".
|
||||
if b >= spaceByte && b <= tildeByte && b != percentByte {
|
||||
buf.WriteByte(b)
|
||||
sb.WriteByte(b)
|
||||
} else {
|
||||
buf.WriteString(fmt.Sprintf("%%%02X", b))
|
||||
fmt.Fprintf(&sb, "%%%02X", b)
|
||||
}
|
||||
}
|
||||
msg = msg[size:]
|
||||
}
|
||||
return buf.String()
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
// decodeGrpcMessage decodes the msg encoded by encodeGrpcMessage.
|
||||
@@ -297,23 +290,23 @@ func decodeGrpcMessage(msg string) string {
|
||||
}
|
||||
|
||||
func decodeGrpcMessageUnchecked(msg string) string {
|
||||
var buf bytes.Buffer
|
||||
var sb strings.Builder
|
||||
lenMsg := len(msg)
|
||||
for i := 0; i < lenMsg; i++ {
|
||||
c := msg[i]
|
||||
if c == percentByte && i+2 < lenMsg {
|
||||
parsed, err := strconv.ParseUint(msg[i+1:i+3], 16, 8)
|
||||
if err != nil {
|
||||
buf.WriteByte(c)
|
||||
sb.WriteByte(c)
|
||||
} else {
|
||||
buf.WriteByte(byte(parsed))
|
||||
sb.WriteByte(byte(parsed))
|
||||
i += 2
|
||||
}
|
||||
} else {
|
||||
buf.WriteByte(c)
|
||||
sb.WriteByte(c)
|
||||
}
|
||||
}
|
||||
return buf.String()
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
type bufWriter struct {
|
||||
@@ -322,8 +315,6 @@ type bufWriter struct {
|
||||
batchSize int
|
||||
conn net.Conn
|
||||
err error
|
||||
|
||||
onFlush func()
|
||||
}
|
||||
|
||||
func newBufWriter(conn net.Conn, batchSize int) *bufWriter {
|
||||
@@ -360,9 +351,6 @@ func (w *bufWriter) Flush() error {
|
||||
if w.offset == 0 {
|
||||
return nil
|
||||
}
|
||||
if w.onFlush != nil {
|
||||
w.onFlush()
|
||||
}
|
||||
_, w.err = w.conn.Write(w.buf[:w.offset])
|
||||
w.offset = 0
|
||||
return w.err
|
||||
|
20
vendor/google.golang.org/grpc/internal/transport/transport.go
generated
vendored
20
vendor/google.golang.org/grpc/internal/transport/transport.go
generated
vendored
@@ -43,6 +43,10 @@ import (
|
||||
"google.golang.org/grpc/tap"
|
||||
)
|
||||
|
||||
// ErrNoHeaders is used as a signal that a trailers only response was received,
|
||||
// and is not a real error.
|
||||
var ErrNoHeaders = errors.New("stream has no headers")
|
||||
|
||||
const logLevel = 2
|
||||
|
||||
type bufferPool struct {
|
||||
@@ -366,9 +370,15 @@ func (s *Stream) Header() (metadata.MD, error) {
|
||||
return s.header.Copy(), nil
|
||||
}
|
||||
s.waitOnHeader()
|
||||
|
||||
if !s.headerValid {
|
||||
return nil, s.status.Err()
|
||||
}
|
||||
|
||||
if s.noHeaders {
|
||||
return nil, ErrNoHeaders
|
||||
}
|
||||
|
||||
return s.header.Copy(), nil
|
||||
}
|
||||
|
||||
@@ -523,7 +533,7 @@ type ServerConfig struct {
|
||||
ConnectionTimeout time.Duration
|
||||
Credentials credentials.TransportCredentials
|
||||
InTapHandle tap.ServerInHandle
|
||||
StatsHandler stats.Handler
|
||||
StatsHandlers []stats.Handler
|
||||
KeepaliveParams keepalive.ServerParameters
|
||||
KeepalivePolicy keepalive.EnforcementPolicy
|
||||
InitialWindowSize int32
|
||||
@@ -553,8 +563,8 @@ type ConnectOptions struct {
|
||||
CredsBundle credentials.Bundle
|
||||
// KeepaliveParams stores the keepalive parameters.
|
||||
KeepaliveParams keepalive.ClientParameters
|
||||
// StatsHandler stores the handler for stats.
|
||||
StatsHandler stats.Handler
|
||||
// StatsHandlers stores the handler for stats.
|
||||
StatsHandlers []stats.Handler
|
||||
// InitialWindowSize sets the initial window size for a stream.
|
||||
InitialWindowSize int32
|
||||
// InitialConnWindowSize sets the initial window size for a connection.
|
||||
@@ -573,8 +583,8 @@ type ConnectOptions struct {
|
||||
|
||||
// NewClientTransport establishes the transport with the required ConnectOptions
|
||||
// and returns it to the caller.
|
||||
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
|
||||
return newHTTP2Client(connectCtx, ctx, addr, opts, onPrefaceReceipt, onGoAway, onClose)
|
||||
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
|
||||
return newHTTP2Client(connectCtx, ctx, addr, opts, onGoAway, onClose)
|
||||
}
|
||||
|
||||
// Options provides additional hints and information for message
|
||||
|
Reference in New Issue
Block a user