diff --git a/pilot/pkg/model/push_context.go b/pilot/pkg/model/push_context.go index 2c6ffb4b3c83..3d8bde22b201 100644 --- a/pilot/pkg/model/push_context.go +++ b/pilot/pkg/model/push_context.go @@ -1082,6 +1082,10 @@ func (ps *PushContext) IsClusterLocal(service *Service) bool { // This should be called before starting the push, from the thread creating // the push context. func (ps *PushContext) InitContext(env *Environment, wp *PushContextWorkerPool, oldPushContext *PushContext, pushReq *PushRequest) error { + log.Infof("[Ying] init context called, wp nil? %t", wp == nil) + if wp != nil { + log.Infof("[Ying] init context called, wp started? %t", wp.Started()) + } // Acquire a lock to ensure we don't concurrently initialize the same PushContext. // If this does happen, one thread will block then exit early from InitDone=true ps.initializeMutex.Lock() @@ -2293,6 +2297,8 @@ type PushContextWorkerPool struct { workerCount int workQueue chan func() + + started bool } func NewPushContextWorkerPool(workerCount int) *PushContextWorkerPool { @@ -2308,8 +2314,8 @@ func NewPushContextWorkerPool(workerCount int) *PushContextWorkerPool { return pool } -func (p *PushContextWorkerPool) GetWorkerCount() int { - return p.workerCount +func (p *PushContextWorkerPool) Started() bool { + return p.started } func (p *PushContextWorkerPool) PushWork(w func()) { @@ -2317,6 +2323,7 @@ func (p *PushContextWorkerPool) PushWork(w func()) { } func (p *PushContextWorkerPool) Run(stop <-chan struct{}) { + p.started = true for i := 0; i < p.workerCount; i++ { go func() { for { diff --git a/pilot/pkg/xds/ads.go b/pilot/pkg/xds/ads.go index b79178b8f585..f018c2bce93b 100644 --- a/pilot/pkg/xds/ads.go +++ b/pilot/pkg/xds/ads.go @@ -290,7 +290,7 @@ func (s *DiscoveryServer) Stream(stream DiscoveryStream) error { log.Debugf("Unauthenticated XDS: %s", peerAddr) } - log.Info("[Ying] Init context called") + log.Info("[Ying] Init context called in Stream") // InitContext returns immediately if the context was already initialized. if err = s.globalPushContext().InitContext(s.Env, s.pushContextWorkerPool, nil, nil); err != nil { // Error accessing the data - log and close, maybe a different pilot replica diff --git a/pilot/pkg/xds/delta.go b/pilot/pkg/xds/delta.go index 243a6b135133..0618102762f1 100644 --- a/pilot/pkg/xds/delta.go +++ b/pilot/pkg/xds/delta.go @@ -75,6 +75,7 @@ func (s *DiscoveryServer) StreamDeltas(stream DeltaDiscoveryStream) error { log.Debugf("Unauthenticated XDS: %v", peerAddr) } + log.Info("[Ying] Init context called in StreamDeltas") // InitContext returns immediately if the context was already initialized. if err = s.globalPushContext().InitContext(s.Env, s.pushContextWorkerPool, nil, nil); err != nil { // Error accessing the data - log and close, maybe a different pilot replica