TUN-1734: Pin packages at exact versions

This commit is contained in:
Areg Harutyunyan
2019-04-17 12:15:55 -05:00
parent 2e2fa29637
commit bab7583a97
823 changed files with 108625 additions and 22044 deletions

View File

@@ -27,16 +27,22 @@ import (
"sort"
"sync"
"sync/atomic"
"time"
"google.golang.org/grpc/grpclog"
)
const (
defaultMaxTraceEntry int32 = 30
)
var (
db dbWrapper
idGen idGenerator
// EntryPerPage defines the number of channelz entries to be shown on a web page.
EntryPerPage = 50
curState int32
EntryPerPage = int64(50)
curState int32
maxTraceEntry = defaultMaxTraceEntry
)
// TurnOn turns on channelz data collection.
@@ -52,6 +58,22 @@ func IsOn() bool {
return atomic.CompareAndSwapInt32(&curState, 1, 1)
}
// SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel).
// Setting it to 0 will disable channel tracing.
func SetMaxTraceEntry(i int32) {
atomic.StoreInt32(&maxTraceEntry, i)
}
// ResetMaxTraceEntryToDefault resets the maximum number of trace entry per entity to default.
func ResetMaxTraceEntryToDefault() {
atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry)
}
func getMaxTraceEntry() int {
i := atomic.LoadInt32(&maxTraceEntry)
return int(i)
}
// dbWarpper wraps around a reference to internal channelz data storage, and
// provide synchronized functionality to set and get the reference.
type dbWrapper struct {
@@ -91,20 +113,20 @@ func NewChannelzStorage() {
// boolean indicating whether there's more top channels to be queried for.
//
// The arg id specifies that only top channel with id at or above it will be included
// in the result. The returned slice is up to a length of EntryPerPage, and is
// sorted in ascending id order.
func GetTopChannels(id int64) ([]*ChannelMetric, bool) {
return db.get().GetTopChannels(id)
// in the result. The returned slice is up to a length of the arg maxResults or
// EntryPerPage if maxResults is zero, and is sorted in ascending id order.
func GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
return db.get().GetTopChannels(id, maxResults)
}
// GetServers returns a slice of server's ServerMetric, along with a
// boolean indicating whether there's more servers to be queried for.
//
// The arg id specifies that only server with id at or above it will be included
// in the result. The returned slice is up to a length of EntryPerPage, and is
// sorted in ascending id order.
func GetServers(id int64) ([]*ServerMetric, bool) {
return db.get().GetServers(id)
// in the result. The returned slice is up to a length of the arg maxResults or
// EntryPerPage if maxResults is zero, and is sorted in ascending id order.
func GetServers(id int64, maxResults int64) ([]*ServerMetric, bool) {
return db.get().GetServers(id, maxResults)
}
// GetServerSockets returns a slice of server's (identified by id) normal socket's
@@ -112,10 +134,10 @@ func GetServers(id int64) ([]*ServerMetric, bool) {
// be queried for.
//
// The arg startID specifies that only sockets with id at or above it will be
// included in the result. The returned slice is up to a length of EntryPerPage,
// and is sorted in ascending id order.
func GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) {
return db.get().GetServerSockets(id, startID)
// included in the result. The returned slice is up to a length of the arg maxResults
// or EntryPerPage if maxResults is zero, and is sorted in ascending id order.
func GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
return db.get().GetServerSockets(id, startID, maxResults)
}
// GetChannel returns the ChannelMetric for the channel (identified by id).
@@ -133,6 +155,11 @@ func GetSocket(id int64) *SocketMetric {
return db.get().GetSocket(id)
}
// GetServer returns the ServerMetric for the server (identified by id).
func GetServer(id int64) *ServerMetric {
return db.get().GetServer(id)
}
// RegisterChannel registers the given channel c in channelz database with ref
// as its reference name, and add it to the child list of its parent (identified
// by pid). pid = 0 means no parent. It returns the unique channelz tracking id
@@ -146,6 +173,7 @@ func RegisterChannel(c Channel, pid int64, ref string) int64 {
nestedChans: make(map[int64]string),
id: id,
pid: pid,
trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
}
if pid == 0 {
db.get().addChannel(id, cn, true, pid, ref)
@@ -170,6 +198,7 @@ func RegisterSubChannel(c Channel, pid int64, ref string) int64 {
sockets: make(map[int64]string),
id: id,
pid: pid,
trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
}
db.get().addSubChannel(id, sc, pid, ref)
return id
@@ -226,6 +255,24 @@ func RemoveEntry(id int64) {
db.get().removeEntry(id)
}
// TraceEventDesc is what the caller of AddTraceEvent should provide to describe the event to be added
// to the channel trace.
// The Parent field is optional. It is used for event that will be recorded in the entity's parent
// trace also.
type TraceEventDesc struct {
Desc string
Severity Severity
Parent *TraceEventDesc
}
// AddTraceEvent adds trace related to the entity with specified id, using the provided TraceEventDesc.
func AddTraceEvent(id int64, desc *TraceEventDesc) {
if getMaxTraceEntry() == 0 {
return
}
db.get().traceEvent(id, desc)
}
// channelMap is the storage data structure for channelz.
// Methods of channelMap can be divided in two two categories with respect to locking.
// 1. Methods acquire the global lock.
@@ -251,6 +298,7 @@ func (c *channelMap) addServer(id int64, s *server) {
func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64, ref string) {
c.mu.Lock()
cn.cm = c
cn.trace.cm = c
c.channels[id] = cn
if isTopChannel {
c.topLevelChannels[id] = struct{}{}
@@ -263,6 +311,7 @@ func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid in
func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref string) {
c.mu.Lock()
sc.cm = c
sc.trace.cm = c
c.subChannels[id] = sc
c.findEntry(pid).addChild(id, sc)
c.mu.Unlock()
@@ -284,16 +333,25 @@ func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64, ref
c.mu.Unlock()
}
// removeEntry triggers the removal of an entry, which may not indeed delete the
// entry, if it has to wait on the deletion of its children, or may lead to a chain
// of entry deletion. For example, deleting the last socket of a gracefully shutting
// down server will lead to the server being also deleted.
// removeEntry triggers the removal of an entry, which may not indeed delete the entry, if it has to
// wait on the deletion of its children and until no other entity's channel trace references it.
// It may lead to a chain of entry deletion. For example, deleting the last socket of a gracefully
// shutting down server will lead to the server being also deleted.
func (c *channelMap) removeEntry(id int64) {
c.mu.Lock()
c.findEntry(id).triggerDelete()
c.mu.Unlock()
}
// c.mu must be held by the caller
func (c *channelMap) decrTraceRefCount(id int64) {
e := c.findEntry(id)
if v, ok := e.(tracedChannel); ok {
v.decrTraceRefCount()
e.deleteSelfIfReady()
}
}
// c.mu must be held by the caller.
func (c *channelMap) findEntry(id int64) entry {
var v entry
@@ -347,6 +405,39 @@ func (c *channelMap) deleteEntry(id int64) {
}
}
func (c *channelMap) traceEvent(id int64, desc *TraceEventDesc) {
c.mu.Lock()
child := c.findEntry(id)
childTC, ok := child.(tracedChannel)
if !ok {
c.mu.Unlock()
return
}
childTC.getChannelTrace().append(&TraceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
if desc.Parent != nil {
parent := c.findEntry(child.getParentID())
var chanType RefChannelType
switch child.(type) {
case *channel:
chanType = RefChannel
case *subChannel:
chanType = RefSubChannel
}
if parentTC, ok := parent.(tracedChannel); ok {
parentTC.getChannelTrace().append(&TraceEvent{
Desc: desc.Parent.Desc,
Severity: desc.Parent.Severity,
Timestamp: time.Now(),
RefID: id,
RefName: childTC.getRefName(),
RefType: chanType,
})
childTC.incrTraceRefCount()
}
}
c.mu.Unlock()
}
type int64Slice []int64
func (s int64Slice) Len() int { return len(s) }
@@ -361,29 +452,32 @@ func copyMap(m map[int64]string) map[int64]string {
return n
}
func min(a, b int) int {
func min(a, b int64) int64 {
if a < b {
return a
}
return b
}
func (c *channelMap) GetTopChannels(id int64) ([]*ChannelMetric, bool) {
func (c *channelMap) GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
if maxResults <= 0 {
maxResults = EntryPerPage
}
c.mu.RLock()
l := len(c.topLevelChannels)
l := int64(len(c.topLevelChannels))
ids := make([]int64, 0, l)
cns := make([]*channel, 0, min(l, EntryPerPage))
cns := make([]*channel, 0, min(l, maxResults))
for k := range c.topLevelChannels {
ids = append(ids, k)
}
sort.Sort(int64Slice(ids))
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
count := 0
count := int64(0)
var end bool
var t []*ChannelMetric
for i, v := range ids[idx:] {
if count == EntryPerPage {
if count == maxResults {
break
}
if cn, ok := c.channels[v]; ok {
@@ -408,25 +502,29 @@ func (c *channelMap) GetTopChannels(id int64) ([]*ChannelMetric, bool) {
t[i].ChannelData = cn.c.ChannelzMetric()
t[i].ID = cn.id
t[i].RefName = cn.refName
t[i].Trace = cn.trace.dumpData()
}
return t, end
}
func (c *channelMap) GetServers(id int64) ([]*ServerMetric, bool) {
func (c *channelMap) GetServers(id, maxResults int64) ([]*ServerMetric, bool) {
if maxResults <= 0 {
maxResults = EntryPerPage
}
c.mu.RLock()
l := len(c.servers)
l := int64(len(c.servers))
ids := make([]int64, 0, l)
ss := make([]*server, 0, min(l, EntryPerPage))
ss := make([]*server, 0, min(l, maxResults))
for k := range c.servers {
ids = append(ids, k)
}
sort.Sort(int64Slice(ids))
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
count := 0
count := int64(0)
var end bool
var s []*ServerMetric
for i, v := range ids[idx:] {
if count == EntryPerPage {
if count == maxResults {
break
}
if svr, ok := c.servers[v]; ok {
@@ -454,7 +552,10 @@ func (c *channelMap) GetServers(id int64) ([]*ServerMetric, bool) {
return s, end
}
func (c *channelMap) GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) {
func (c *channelMap) GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
if maxResults <= 0 {
maxResults = EntryPerPage
}
var svr *server
var ok bool
c.mu.RLock()
@@ -464,18 +565,18 @@ func (c *channelMap) GetServerSockets(id int64, startID int64) ([]*SocketMetric,
return nil, true
}
svrskts := svr.sockets
l := len(svrskts)
l := int64(len(svrskts))
ids := make([]int64, 0, l)
sks := make([]*normalSocket, 0, min(l, EntryPerPage))
sks := make([]*normalSocket, 0, min(l, maxResults))
for k := range svrskts {
ids = append(ids, k)
}
sort.Sort((int64Slice(ids)))
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
count := 0
sort.Sort(int64Slice(ids))
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
count := int64(0)
var end bool
for i, v := range ids[idx:] {
if count == EntryPerPage {
if count == maxResults {
break
}
if ns, ok := c.normalSockets[v]; ok {
@@ -514,10 +615,14 @@ func (c *channelMap) GetChannel(id int64) *ChannelMetric {
}
cm.NestedChans = copyMap(cn.nestedChans)
cm.SubChans = copyMap(cn.subChans)
// cn.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of cn.c when
// holding the lock to prevent potential data race.
chanCopy := cn.c
c.mu.RUnlock()
cm.ChannelData = cn.c.ChannelzMetric()
cm.ChannelData = chanCopy.ChannelzMetric()
cm.ID = cn.id
cm.RefName = cn.refName
cm.Trace = cn.trace.dumpData()
return cm
}
@@ -532,10 +637,14 @@ func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric {
return nil
}
cm.Sockets = copyMap(sc.sockets)
// sc.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of sc.c when
// holding the lock to prevent potential data race.
chanCopy := sc.c
c.mu.RUnlock()
cm.ChannelData = sc.c.ChannelzMetric()
cm.ChannelData = chanCopy.ChannelzMetric()
cm.ID = sc.id
cm.RefName = sc.refName
cm.Trace = sc.trace.dumpData()
return cm
}
@@ -560,6 +669,23 @@ func (c *channelMap) GetSocket(id int64) *SocketMetric {
return nil
}
func (c *channelMap) GetServer(id int64) *ServerMetric {
sm := &ServerMetric{}
var svr *server
var ok bool
c.mu.RLock()
if svr, ok = c.servers[id]; !ok {
c.mu.RUnlock()
return nil
}
sm.ListenSockets = copyMap(svr.listenSockets)
c.mu.RUnlock()
sm.ID = svr.id
sm.RefName = svr.refName
sm.ServerData = svr.s.ChannelzMetric()
return sm
}
type idGenerator struct {
id int64
}