TUN-8407: Upgrade go to version 1.22.2

This commit is contained in:
João "Pisco" Fernandes
2024-05-07 11:19:58 +01:00
parent f27418044b
commit 66efd3f2bb
412 changed files with 28140 additions and 17395 deletions

View File

@@ -67,7 +67,7 @@ var (
errConnDrain = errors.New("grpc: the connection is drained")
// errConnClosing indicates that the connection is closing.
errConnClosing = errors.New("grpc: the connection is closing")
// errConnIdling indicates the the connection is being closed as the channel
// errConnIdling indicates the connection is being closed as the channel
// is moving to an idle mode due to inactivity.
errConnIdling = errors.New("grpc: the connection is closing due to channel idleness")
// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
@@ -101,11 +101,6 @@ const (
defaultReadBufSize = 32 * 1024
)
// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
type defaultConfigSelector struct {
sc *ServiceConfig
}
@@ -117,13 +112,22 @@ func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*ires
}, nil
}
// newClient returns a new client in idle mode.
func newClient(target string, opts ...DialOption) (conn *ClientConn, err error) {
// NewClient creates a new gRPC "channel" for the target URI provided. No I/O
// is performed. Use of the ClientConn for RPCs will automatically cause it to
// connect. Connect may be used to manually create a connection, but for most
// users this is unnecessary.
//
// The target name syntax is defined in
// https://github.com/grpc/grpc/blob/master/doc/naming.md. e.g. to use dns
// resolver, a "dns:///" prefix should be applied to the target.
//
// The DialOptions returned by WithBlock, WithTimeout, and
// WithReturnConnectionError are ignored by this function.
func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
czData: new(channelzData),
}
cc.retryThrottler.Store((*retryThrottler)(nil))
@@ -175,15 +179,15 @@ func newClient(target string, opts ...DialOption) (conn *ClientConn, err error)
// Determine the resolver to use.
if err := cc.parseTargetAndFindResolver(); err != nil {
channelz.RemoveEntry(cc.channelzID)
channelz.RemoveEntry(cc.channelz.ID)
return nil, err
}
if err = cc.determineAuthority(); err != nil {
channelz.RemoveEntry(cc.channelzID)
channelz.RemoveEntry(cc.channelz.ID)
return nil, err
}
cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID)
cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)
cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)
cc.initIdleStateLocked() // Safe to call without the lock, since nothing else has a reference to cc.
@@ -191,39 +195,36 @@ func newClient(target string, opts ...DialOption) (conn *ClientConn, err error)
return cc, nil
}
// DialContext creates a client connection to the given target. By default, it's
// a non-blocking dial (the function won't wait for connections to be
// established, and connecting happens in the background). To make it a blocking
// dial, use WithBlock() dial option.
// Dial calls DialContext(context.Background(), target, opts...).
//
// In the non-blocking case, the ctx does not act against the connection. It
// only controls the setup steps.
// Deprecated: use NewClient instead. Will be supported throughout 1.x.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
// DialContext calls NewClient and then exits idle mode. If WithBlock(true) is
// used, it calls Connect and WaitForStateChange until either the context
// expires or the state of the ClientConn is Ready.
//
// In the blocking case, ctx can be used to cancel or expire the pending
// connection. Once this function returns, the cancellation and expiration of
// ctx will be noop. Users should call ClientConn.Close to terminate all the
// pending operations after this function returns.
// One subtle difference between NewClient and Dial and DialContext is that the
// former uses "dns" as the default name resolver, while the latter use
// "passthrough" for backward compatibility. This distinction should not matter
// to most users, but could matter to legacy users that specify a custom dialer
// and expect it to receive the target string directly.
//
// The target name syntax is defined in
// https://github.com/grpc/grpc/blob/master/doc/naming.md.
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
// Deprecated: use NewClient instead. Will be supported throughout 1.x.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc, err := newClient(target, opts...)
// At the end of this method, we kick the channel out of idle, rather than
// waiting for the first rpc.
opts = append([]DialOption{withDefaultScheme("passthrough")}, opts...)
cc, err := NewClient(target, opts...)
if err != nil {
return nil, err
}
// We start the channel off in idle mode, but kick it out of idle now,
// instead of waiting for the first RPC. Other gRPC implementations do wait
// for the first RPC to kick the channel out of idle. But doing so would be
// a major behavior change for our users who are used to seeing the channel
// active after Dial.
//
// Taking this approach of kicking it out of idle at the end of this method
// allows us to share the code between channel creation and exiting idle
// mode. This will also make it easy for us to switch to starting the
// channel off in idle, i.e. by making newClient exported.
// instead of waiting for the first RPC. This is the legacy behavior of
// Dial.
defer func() {
if err != nil {
cc.Close()
@@ -291,17 +292,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
// addTraceEvent is a helper method to add a trace event on the channel. If the
// channel is a nested one, the same event is also added on the parent channel.
func (cc *ClientConn) addTraceEvent(msg string) {
ted := &channelz.TraceEventDesc{
ted := &channelz.TraceEvent{
Desc: fmt.Sprintf("Channel %s", msg),
Severity: channelz.CtInfo,
}
if cc.dopts.channelzParentID != nil {
ted.Parent = &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Nested channel(id:%d) %s", cc.channelzID.Int(), msg),
if cc.dopts.channelzParent != nil {
ted.Parent = &channelz.TraceEvent{
Desc: fmt.Sprintf("Nested channel(id:%d) %s", cc.channelz.ID, msg),
Severity: channelz.CtInfo,
}
}
channelz.AddTraceEvent(logger, cc.channelzID, 0, ted)
channelz.AddTraceEvent(logger, cc.channelz, 0, ted)
}
type idler ClientConn
@@ -418,14 +419,15 @@ func (cc *ClientConn) validateTransportCredentials() error {
}
// channelzRegistration registers the newly created ClientConn with channelz and
// stores the returned identifier in `cc.channelzID` and `cc.csMgr.channelzID`.
// A channelz trace event is emitted for ClientConn creation. If the newly
// created ClientConn is a nested one, i.e a valid parent ClientConn ID is
// specified via a dial option, the trace event is also added to the parent.
// stores the returned identifier in `cc.channelz`. A channelz trace event is
// emitted for ClientConn creation. If the newly created ClientConn is a nested
// one, i.e a valid parent ClientConn ID is specified via a dial option, the
// trace event is also added to the parent.
//
// Doesn't grab cc.mu as this method is expected to be called only at Dial time.
func (cc *ClientConn) channelzRegistration(target string) {
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
parentChannel, _ := cc.dopts.channelzParent.(*channelz.Channel)
cc.channelz = channelz.RegisterChannel(parentChannel, target)
cc.addTraceEvent("created")
}
@@ -492,11 +494,11 @@ func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStr
}
// newConnectivityStateManager creates an connectivityStateManager with
// the specified id.
func newConnectivityStateManager(ctx context.Context, id *channelz.Identifier) *connectivityStateManager {
// the specified channel.
func newConnectivityStateManager(ctx context.Context, channel *channelz.Channel) *connectivityStateManager {
return &connectivityStateManager{
channelzID: id,
pubSub: grpcsync.NewPubSub(ctx),
channelz: channel,
pubSub: grpcsync.NewPubSub(ctx),
}
}
@@ -510,7 +512,7 @@ type connectivityStateManager struct {
mu sync.Mutex
state connectivity.State
notifyChan chan struct{}
channelzID *channelz.Identifier
channelz *channelz.Channel
pubSub *grpcsync.PubSub
}
@@ -527,9 +529,10 @@ func (csm *connectivityStateManager) updateState(state connectivity.State) {
return
}
csm.state = state
csm.channelz.ChannelMetrics.State.Store(&state)
csm.pubSub.Publish(state)
channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state)
channelz.Infof(logger, csm.channelz, "Channel Connectivity change to %v", state)
if csm.notifyChan != nil {
// There are other goroutines waiting on this channel.
close(csm.notifyChan)
@@ -583,12 +586,12 @@ type ClientConn struct {
cancel context.CancelFunc // Cancelled on close.
// The following are initialized at dial time, and are read-only after that.
target string // User's dial target.
parsedTarget resolver.Target // See parseTargetAndFindResolver().
authority string // See determineAuthority().
dopts dialOptions // Default and user specified dial options.
channelzID *channelz.Identifier // Channelz identifier for the channel.
resolverBuilder resolver.Builder // See parseTargetAndFindResolver().
target string // User's dial target.
parsedTarget resolver.Target // See parseTargetAndFindResolver().
authority string // See determineAuthority().
dopts dialOptions // Default and user specified dial options.
channelz *channelz.Channel // Channelz object.
resolverBuilder resolver.Builder // See parseTargetAndFindResolver().
idlenessMgr *idle.Manager
// The following provide their own synchronization, and therefore don't
@@ -596,7 +599,6 @@ type ClientConn struct {
csMgr *connectivityStateManager
pickerWrapper *pickerWrapper
safeConfigSelector iresolver.SafeConfigSelector
czData *channelzData
retryThrottler atomic.Value // Updated from service config.
// mu protects the following fields.
@@ -690,6 +692,7 @@ func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
var emptyServiceConfig *ServiceConfig
func init() {
balancer.Register(pickfirstBuilder{})
cfg := parseServiceConfig("{}")
if cfg.Err != nil {
panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
@@ -707,15 +710,15 @@ func init() {
}
}
func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
func (cc *ClientConn) maybeApplyDefaultServiceConfig() {
if cc.sc != nil {
cc.applyServiceConfigAndBalancer(cc.sc, nil, addrs)
cc.applyServiceConfigAndBalancer(cc.sc, nil)
return
}
if cc.dopts.defaultServiceConfig != nil {
cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, addrs)
cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig})
} else {
cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig}, addrs)
cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig})
}
}
@@ -733,7 +736,7 @@ func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error)
// May need to apply the initial service config in case the resolver
// doesn't support service configs, or doesn't provide a service config
// with the new addresses.
cc.maybeApplyDefaultServiceConfig(nil)
cc.maybeApplyDefaultServiceConfig()
cc.balancerWrapper.resolverError(err)
@@ -744,10 +747,10 @@ func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error)
var ret error
if cc.dopts.disableServiceConfig {
channelz.Infof(logger, cc.channelzID, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig)
cc.maybeApplyDefaultServiceConfig(s.Addresses)
channelz.Infof(logger, cc.channelz, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig)
cc.maybeApplyDefaultServiceConfig()
} else if s.ServiceConfig == nil {
cc.maybeApplyDefaultServiceConfig(s.Addresses)
cc.maybeApplyDefaultServiceConfig()
// TODO: do we need to apply a failing LB policy if there is no
// default, per the error handling design?
} else {
@@ -755,12 +758,12 @@ func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error)
configSelector := iresolver.GetConfigSelector(s)
if configSelector != nil {
if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector")
channelz.Infof(logger, cc.channelz, "method configs in service config will be ignored due to presence of config selector")
}
} else {
configSelector = &defaultConfigSelector{sc}
}
cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
cc.applyServiceConfigAndBalancer(sc, configSelector)
} else {
ret = balancer.ErrBadResolverState
if cc.sc == nil {
@@ -775,7 +778,7 @@ func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error)
var balCfg serviceconfig.LoadBalancingConfig
if cc.sc != nil && cc.sc.lbConfig != nil {
balCfg = cc.sc.lbConfig.cfg
balCfg = cc.sc.lbConfig
}
bw := cc.balancerWrapper
cc.mu.Unlock()
@@ -834,22 +837,17 @@ func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.
addrs: copyAddressesWithoutBalancerAttributes(addrs),
scopts: opts,
dopts: cc.dopts,
czData: new(channelzData),
channelz: channelz.RegisterSubChannel(cc.channelz.ID, ""),
resetBackoff: make(chan struct{}),
stateChan: make(chan struct{}),
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
var err error
ac.channelzID, err = channelz.RegisterSubChannel(ac, cc.channelzID, "")
if err != nil {
return nil, err
}
channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
channelz.AddTraceEvent(logger, ac.channelz, 0, &channelz.TraceEvent{
Desc: "Subchannel created",
Severity: channelz.CtInfo,
Parent: &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID.Int()),
Parent: &channelz.TraceEvent{
Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelz.ID),
Severity: channelz.CtInfo,
},
})
@@ -872,38 +870,27 @@ func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
ac.tearDown(err)
}
func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
return &channelz.ChannelInternalMetric{
State: cc.GetState(),
Target: cc.target,
CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted),
CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded),
CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed),
LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
}
}
// Target returns the target string of the ClientConn.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func (cc *ClientConn) Target() string {
return cc.target
}
// CanonicalTarget returns the canonical target string of the ClientConn.
func (cc *ClientConn) CanonicalTarget() string {
return cc.parsedTarget.String()
}
func (cc *ClientConn) incrCallsStarted() {
atomic.AddInt64(&cc.czData.callsStarted, 1)
atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
cc.channelz.ChannelMetrics.CallsStarted.Add(1)
cc.channelz.ChannelMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())
}
func (cc *ClientConn) incrCallsSucceeded() {
atomic.AddInt64(&cc.czData.callsSucceeded, 1)
cc.channelz.ChannelMetrics.CallsSucceeded.Add(1)
}
func (cc *ClientConn) incrCallsFailed() {
atomic.AddInt64(&cc.czData.callsFailed, 1)
cc.channelz.ChannelMetrics.CallsFailed.Add(1)
}
// connect starts creating a transport.
@@ -947,7 +934,7 @@ func equalAddresses(a, b []resolver.Address) bool {
// connections or connection attempts.
func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
ac.mu.Lock()
channelz.Infof(logger, ac.channelzID, "addrConn: updateAddrs curAddr: %v, addrs: %v", pretty.ToJSON(ac.curAddr), pretty.ToJSON(addrs))
channelz.Infof(logger, ac.channelz, "addrConn: updateAddrs curAddr: %v, addrs: %v", pretty.ToJSON(ac.curAddr), pretty.ToJSON(addrs))
addrs = copyAddressesWithoutBalancerAttributes(addrs)
if equalAddresses(ac.addrs, addrs) {
@@ -1067,7 +1054,7 @@ func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method st
})
}
func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {
func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector) {
if sc == nil {
// should never reach here.
return
@@ -1088,17 +1075,6 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel
} else {
cc.retryThrottler.Store((*retryThrottler)(nil))
}
var newBalancerName string
if cc.sc == nil || (cc.sc.lbConfig == nil && cc.sc.LB == nil) {
// No service config or no LB policy specified in config.
newBalancerName = PickFirstBalancerName
} else if cc.sc.lbConfig != nil {
newBalancerName = cc.sc.lbConfig.name
} else { // cc.sc.LB != nil
newBalancerName = *cc.sc.LB
}
cc.balancerWrapper.switchTo(newBalancerName)
}
func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
@@ -1174,7 +1150,7 @@ func (cc *ClientConn) Close() error {
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add
// trace reference to the entity being deleted, and thus prevent it from being
// deleted right away.
channelz.RemoveEntry(cc.channelzID)
channelz.RemoveEntry(cc.channelz.ID)
return nil
}
@@ -1206,8 +1182,7 @@ type addrConn struct {
backoffIdx int // Needs to be stateful for resetConnectBackoff.
resetBackoff chan struct{}
channelzID *channelz.Identifier
czData *channelzData
channelz *channelz.SubChannel
}
// Note: this requires a lock on ac.mu.
@@ -1219,10 +1194,11 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error)
close(ac.stateChan)
ac.stateChan = make(chan struct{})
ac.state = s
ac.channelz.ChannelMetrics.State.Store(&s)
if lastErr == nil {
channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v", s)
channelz.Infof(logger, ac.channelz, "Subchannel Connectivity change to %v", s)
} else {
channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v, last error: %s", s, lastErr)
channelz.Infof(logger, ac.channelz, "Subchannel Connectivity change to %v, last error: %s", s, lastErr)
}
ac.acbw.updateState(s, lastErr)
}
@@ -1335,7 +1311,7 @@ func (ac *addrConn) tryAllAddrs(ctx context.Context, addrs []resolver.Address, c
}
ac.mu.Unlock()
channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
channelz.Infof(logger, ac.channelz, "Subchannel picks a new address %q to connect", addr.Addr)
err := ac.createTransport(ctx, addr, copts, connectDeadline)
if err == nil {
@@ -1388,7 +1364,7 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address,
connectCtx, cancel := context.WithDeadline(ctx, connectDeadline)
defer cancel()
copts.ChannelzParentID = ac.channelzID
copts.ChannelzParent = ac.channelz
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onClose)
if err != nil {
@@ -1397,7 +1373,7 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address,
}
// newTr is either nil, or closed.
hcancel()
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addr, err)
channelz.Warningf(logger, ac.channelz, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addr, err)
return err
}
@@ -1469,7 +1445,7 @@ func (ac *addrConn) startHealthCheck(ctx context.Context) {
// The health package is not imported to set health check function.
//
// TODO: add a link to the health check doc in the error message.
channelz.Error(logger, ac.channelzID, "Health check is requested but health check function is not set.")
channelz.Error(logger, ac.channelz, "Health check is requested but health check function is not set.")
return
}
@@ -1499,9 +1475,9 @@ func (ac *addrConn) startHealthCheck(ctx context.Context) {
err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
if err != nil {
if status.Code(err) == codes.Unimplemented {
channelz.Error(logger, ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled")
channelz.Error(logger, ac.channelz, "Subchannel health check is unimplemented at server side, thus health check is disabled")
} else {
channelz.Errorf(logger, ac.channelzID, "Health checking failed: %v", err)
channelz.Errorf(logger, ac.channelz, "Health checking failed: %v", err)
}
}
}()
@@ -1566,18 +1542,18 @@ func (ac *addrConn) tearDown(err error) {
ac.cancel()
ac.curAddr = resolver.Address{}
channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
channelz.AddTraceEvent(logger, ac.channelz, 0, &channelz.TraceEvent{
Desc: "Subchannel deleted",
Severity: channelz.CtInfo,
Parent: &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Subchannel(id:%d) deleted", ac.channelzID.Int()),
Parent: &channelz.TraceEvent{
Desc: fmt.Sprintf("Subchannel(id:%d) deleted", ac.channelz.ID),
Severity: channelz.CtInfo,
},
})
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add
// trace reference to the entity being deleted, and thus prevent it from
// being deleted right away.
channelz.RemoveEntry(ac.channelzID)
channelz.RemoveEntry(ac.channelz.ID)
ac.mu.Unlock()
// We have to release the lock before the call to GracefulClose/Close here
@@ -1604,39 +1580,6 @@ func (ac *addrConn) tearDown(err error) {
}
}
func (ac *addrConn) getState() connectivity.State {
ac.mu.Lock()
defer ac.mu.Unlock()
return ac.state
}
func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
ac.mu.Lock()
addr := ac.curAddr.Addr
ac.mu.Unlock()
return &channelz.ChannelInternalMetric{
State: ac.getState(),
Target: addr,
CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted),
CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded),
CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed),
LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
}
}
func (ac *addrConn) incrCallsStarted() {
atomic.AddInt64(&ac.czData.callsStarted, 1)
atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
}
func (ac *addrConn) incrCallsSucceeded() {
atomic.AddInt64(&ac.czData.callsSucceeded, 1)
}
func (ac *addrConn) incrCallsFailed() {
atomic.AddInt64(&ac.czData.callsFailed, 1)
}
type retryThrottler struct {
max float64
thresh float64
@@ -1674,12 +1617,17 @@ func (rt *retryThrottler) successfulRPC() {
}
}
type channelzChannel struct {
cc *ClientConn
func (ac *addrConn) incrCallsStarted() {
ac.channelz.ChannelMetrics.CallsStarted.Add(1)
ac.channelz.ChannelMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())
}
func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
return c.cc.channelzMetric()
func (ac *addrConn) incrCallsSucceeded() {
ac.channelz.ChannelMetrics.CallsSucceeded.Add(1)
}
func (ac *addrConn) incrCallsFailed() {
ac.channelz.ChannelMetrics.CallsFailed.Add(1)
}
// ErrClientConnTimeout indicates that the ClientConn cannot establish the
@@ -1721,14 +1669,14 @@ func (cc *ClientConn) connectionError() error {
//
// Doesn't grab cc.mu as this method is expected to be called only at Dial time.
func (cc *ClientConn) parseTargetAndFindResolver() error {
channelz.Infof(logger, cc.channelzID, "original dial target is: %q", cc.target)
channelz.Infof(logger, cc.channelz, "original dial target is: %q", cc.target)
var rb resolver.Builder
parsedTarget, err := parseTarget(cc.target)
if err != nil {
channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err)
channelz.Infof(logger, cc.channelz, "dial target %q parse failed: %v", cc.target, err)
} else {
channelz.Infof(logger, cc.channelzID, "parsed dial target is: %#v", parsedTarget)
channelz.Infof(logger, cc.channelz, "parsed dial target is: %#v", parsedTarget)
rb = cc.getResolver(parsedTarget.URL.Scheme)
if rb != nil {
cc.parsedTarget = parsedTarget
@@ -1740,17 +1688,22 @@ func (cc *ClientConn) parseTargetAndFindResolver() error {
// We are here because the user's dial target did not contain a scheme or
// specified an unregistered scheme. We should fallback to the default
// scheme, except when a custom dialer is specified in which case, we should
// always use passthrough scheme.
defScheme := resolver.GetDefaultScheme()
channelz.Infof(logger, cc.channelzID, "fallback to scheme %q", defScheme)
// always use passthrough scheme. For either case, we need to respect any overridden
// global defaults set by the user.
defScheme := cc.dopts.defaultScheme
if internal.UserSetDefaultScheme {
defScheme = resolver.GetDefaultScheme()
}
channelz.Infof(logger, cc.channelz, "fallback to scheme %q", defScheme)
canonicalTarget := defScheme + ":///" + cc.target
parsedTarget, err = parseTarget(canonicalTarget)
if err != nil {
channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", canonicalTarget, err)
channelz.Infof(logger, cc.channelz, "dial target %q parse failed: %v", canonicalTarget, err)
return err
}
channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)
channelz.Infof(logger, cc.channelz, "parsed dial target is: %+v", parsedTarget)
rb = cc.getResolver(parsedTarget.URL.Scheme)
if rb == nil {
return fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.URL.Scheme)
@@ -1772,6 +1725,8 @@ func parseTarget(target string) (resolver.Target, error) {
return resolver.Target{URL: *u}, nil
}
// encodeAuthority escapes the authority string based on valid chars defined in
// https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.
func encodeAuthority(authority string) string {
const upperhex = "0123456789ABCDEF"
@@ -1860,29 +1815,17 @@ func (cc *ClientConn) determineAuthority() error {
}
endpoint := cc.parsedTarget.Endpoint()
target := cc.target
switch {
case authorityFromDialOption != "":
if authorityFromDialOption != "" {
cc.authority = authorityFromDialOption
case authorityFromCreds != "":
} else if authorityFromCreds != "" {
cc.authority = authorityFromCreds
case strings.HasPrefix(target, "unix:") || strings.HasPrefix(target, "unix-abstract:"):
// TODO: remove when the unix resolver implements optional interface to
// return channel authority.
cc.authority = "localhost"
case strings.HasPrefix(endpoint, ":"):
} else if auth, ok := cc.resolverBuilder.(resolver.AuthorityOverrider); ok {
cc.authority = auth.OverrideAuthority(cc.parsedTarget)
} else if strings.HasPrefix(endpoint, ":") {
cc.authority = "localhost" + endpoint
default:
// TODO: Define an optional interface on the resolver builder to return
// the channel authority given the user's dial target. For resolvers
// which don't implement this interface, we will use the endpoint from
// "scheme://authority/endpoint" as the default authority.
// Escape the endpoint to handle use cases where the endpoint
// might not be a valid authority by default.
// For example an endpoint which has multiple paths like
// 'a/b/c', which is not a valid authority by default.
} else {
cc.authority = encodeAuthority(endpoint)
}
channelz.Infof(logger, cc.channelzID, "Channel authority set to %q", cc.authority)
channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)
return nil
}