diff --git a/src/backend/booster/bk_dist/common/longtcp/session.go b/src/backend/booster/bk_dist/common/longtcp/session.go index 47064cab..6aa864f4 100644 --- a/src/backend/booster/bk_dist/common/longtcp/session.go +++ b/src/backend/booster/bk_dist/common/longtcp/session.go @@ -287,6 +287,11 @@ func (s *Session) copyMessages() []*Message { ret := make([]*Message, len(s.sendQueue), len(s.sendQueue)) copy(ret, s.sendQueue) blog.Debugf("[longtcp] copied %d messages", len(ret)) + ids := []string{} + for _, msg := range s.sendQueue { + ids = append(ids, msg.Desc()) + } + blog.Infof("[longtcp] [trace message] %v copied to real send queue", ids) s.sendQueue = make([]*Message, 0, 10) return ret } else { @@ -302,7 +307,9 @@ func (s *Session) putWait(msg *Message) error { if s.waitMap != nil { msg.WaitStart = time.Now() s.waitMap[msg.TCPHead.UniqID] = msg + blog.Infof("[longtcp] [trace message] [%s] put to wait queue", msg.Desc()) } else { + blog.Warnf("[longtcp] [trace message] [%s] session invalid when put to wait queue", msg.Desc()) return ErrorConnectionInvalid } @@ -330,6 +337,7 @@ func (s *Session) returnWait(ret *MessageResult) error { if m, ok := s.waitMap[ret.TCPHead.UniqID]; ok { m.RetChan <- ret delete(s.waitMap, ret.TCPHead.UniqID) + blog.Infof("[longtcp] [trace message] [%s] notified with response data ", ret.TCPHead.UniqID) return nil } } @@ -348,8 +356,8 @@ func (s *Session) uniqid() uint64 { return id } -func formatID(id uint64) MessageID { - data := fmt.Sprintf("UNIQID%26x", id) +func formatID(id uint64, localport int32) MessageID { + data := fmt.Sprintf("UNIQID_%011d_%013d", localport, id) return MessageID(data) } @@ -370,7 +378,7 @@ func (s *Session) encData2Message( return &Message{ TCPHead: &LongTCPHead{ - UniqID: formatID(s.uniqid()), + UniqID: formatID(s.uniqid(), s.client.LocalPort()), DataLen: totallen, }, Data: data, @@ -402,6 +410,7 @@ func (s *Session) onMessageError(m *Message, err error) { s.removeWait(m) } + blog.Warnf("[longtcp] [trace message] [%s] notified with error:%v", m.Desc(), err) m.RetChan <- &MessageResult{ Err: err, Data: nil, @@ -420,6 +429,7 @@ func (s *Session) sendReal() { if m.F != nil { m.F() } + blog.Warnf("[longtcp] [trace message] [%s] notified with error:%v", m.Desc(), err) m.RetChan <- &MessageResult{ Err: err, Data: nil, @@ -482,6 +492,7 @@ func (s *Session) sendReal() { blog.Debugf("[longtcp] session[%s] real sent body with ID [%s] ", s.Desc(), m.Desc()) if !m.WaitResponse { + blog.Infof("[longtcp] [trace message] [%s] notified after sent", m.Desc()) m.RetChan <- &MessageResult{ Err: nil, Data: nil, @@ -533,7 +544,8 @@ func (s *Session) receiveRoutine(wg *sync.WaitGroup) { return } - blog.Debugf("[longtcp] session[%s] received %d data", s.Desc(), recvlen) + // blog.Debugf("[longtcp] session[%s] received %d data", s.Desc(), recvlen) + blog.Infof("[longtcp] [trace message] [%s] received response data", head.UniqID) // TODO : decode msg, and call funtions to deal, and return response ret := &MessageResult{ Err: nil, @@ -573,6 +585,7 @@ func (s *Session) notifyAndWait(msg *Message) *MessageResult { s.sendMutex.Lock() if !s.valid { + blog.Warnf("[longtcp] [trace message] [%s] session invalid when append to send queue", msg.Desc()) s.sendMutex.Unlock() return &MessageResult{ Err: ErrorConnectionInvalid, @@ -581,6 +594,8 @@ func (s *Session) notifyAndWait(msg *Message) *MessageResult { } s.sendQueue = append(s.sendQueue, msg) + blog.Infof("[longtcp] [trace message] [%s] appended to send queue", msg.Desc()) + s.sendMutex.Unlock() blog.Debugf("[longtcp] notify by chan now, total %d messages now", len(s.sendQueue)) @@ -598,6 +613,7 @@ func (s *Session) Send( waitsecs int32, f OnSendDoneFunc) *MessageResult { if !s.valid { + blog.Warnf("[longtcp] [trace message] connection invalid when send") return &MessageResult{ Err: ErrorConnectionInvalid, Data: nil, @@ -619,6 +635,7 @@ func (s *Session) Send( // 如果 waitresponse为true,则需要等待返回的结果 func (s *Session) SendWithID(id MessageID, data [][]byte, waitresponse bool) *MessageResult { if !s.valid { + blog.Warnf("[longtcp] [trace message] [%s] connection invalid when send", id) return &MessageResult{ Err: ErrorConnectionInvalid, Data: nil, @@ -707,12 +724,15 @@ func (s *Session) check(wg *sync.WaitGroup) { // 清理资源,包括关闭连接,停止协程等 func (s *Session) Clean(err error) { - blog.Debugf("[longtcp] session %s clean now", s.Desc()) + // blog.Debugf("[longtcp] session %s clean now", s.Desc()) + blog.Infof("[longtcp] [trace message] [session:%s] ready clean now", s.Desc()) s.cancel() // 通知发送队列中的任务 s.sendMutex.Lock() + blog.Infof("[longtcp] [trace message] [session:%s] has %d in send queue when clean", + s.Desc(), len(s.sendQueue)) if !s.valid { // 避免重复clean blog.Debugf("[longtcp] session %s has cleaned before", s.Desc()) return @@ -720,6 +740,8 @@ func (s *Session) Clean(err error) { s.valid = false } for _, m := range s.sendQueue { + blog.Warnf("[longtcp] [trace message] [session:%s] [%s] notified in send queue with error:%v", + s.Desc(), m.Desc(), err) m.RetChan <- &MessageResult{ Err: err, Data: nil, @@ -730,7 +752,11 @@ func (s *Session) Clean(err error) { // 通知等待结果的队列中的任务 s.waitMutex.Lock() + blog.Infof("[longtcp] [trace message] [session:%s] has %d in wait queue when clean", + s.Desc(), len(s.waitMap)) for _, m := range s.waitMap { + blog.Warnf("[longtcp] [trace message] [session:%s] [%s] notified in wait queue with error:%v", + s.Desc(), m.Desc(), err) m.RetChan <- &MessageResult{ Err: err, Data: nil, @@ -751,9 +777,9 @@ func (s *Session) checkWaitTimeout() { for _, m := range s.waitMap { if m.MaxWaitSecs > 0 && m.WaitStart.Add(time.Duration(m.MaxWaitSecs)*time.Second).Before(time.Now()) { - blog.Infof("[longtcp] session %s found message with id:[%v] timeout with %d seconds", + blog.Infof("[longtcp] [trace message] [%s] found message in session:[%s] timeout with %d seconds", + m.Desc(), s.Desc(), - m.TCPHead.UniqID, m.MaxWaitSecs) delete(s.waitMap, m.TCPHead.UniqID) diff --git a/src/backend/booster/bk_dist/common/longtcp/tcpclient.go b/src/backend/booster/bk_dist/common/longtcp/tcpclient.go index f5fdd5f2..2c06d78b 100644 --- a/src/backend/booster/bk_dist/common/longtcp/tcpclient.go +++ b/src/backend/booster/bk_dist/common/longtcp/tcpclient.go @@ -35,6 +35,9 @@ const ( type TCPClient struct { timeout int conn *net.TCPConn + + desc string + localport int32 } // NewTCPClient return new TCPClient @@ -44,7 +47,8 @@ func NewTCPClient(timeout int) *TCPClient { } return &TCPClient{ - timeout: timeout, + timeout: timeout, + localport: -1, } } @@ -57,8 +61,9 @@ func NewTCPClientWithConn(conn *net.TCPConn) *TCPClient { } return &TCPClient{ - conn: conn, - timeout: DefaultLongTCPTimeoutSeconds, + conn: conn, + timeout: DefaultLongTCPTimeoutSeconds, + localport: -1, } } @@ -356,5 +361,25 @@ func (c *TCPClient) ConnDesc() string { return "" } - return fmt.Sprintf("%s->%s", c.conn.LocalAddr().String(), c.conn.RemoteAddr().String()) + if c.desc != "" { + return c.desc + } + + c.desc = fmt.Sprintf("%s->%s", c.conn.LocalAddr().String(), c.conn.RemoteAddr().String()) + return c.desc +} + +func (c *TCPClient) LocalPort() int32 { + if c.localport != -1 { + return c.localport + } + + localAddr, ok := c.conn.LocalAddr().(*net.TCPAddr) + if !ok { + c.localport = 0 + } else { + c.localport = int32(localAddr.Port) + } + + return c.localport } diff --git a/src/backend/booster/bk_dist/common/pump/pump.go b/src/backend/booster/bk_dist/common/pump/pump.go index 6d011bc6..94ec10d2 100644 --- a/src/backend/booster/bk_dist/common/pump/pump.go +++ b/src/backend/booster/bk_dist/common/pump/pump.go @@ -156,7 +156,8 @@ func IsPump(env *env.Sandbox) bool { } func SupportPump(env *env.Sandbox) bool { - return IsPump(env) && (runtime.GOOS == "windows" || runtime.GOOS == "darwin") + return IsPump(env) + // return IsPump(env) && (runtime.GOOS == "windows" || runtime.GOOS == "darwin") // return IsPump(env) && runtime.GOOS == "windows" } diff --git a/src/backend/booster/server/pkg/resource/crm/manager.go b/src/backend/booster/server/pkg/resource/crm/manager.go index 0dc2731f..7df224f5 100644 --- a/src/backend/booster/server/pkg/resource/crm/manager.go +++ b/src/backend/booster/server/pkg/resource/crm/manager.go @@ -513,6 +513,12 @@ func (rm *resourceManager) isFinishDeploying(resourceID, user string) bool { } } +func (rm *resourceManager) updateNoReadyInfo(r *resource, cleannum int, leftnum int, caller string) { + r.noReadyInstance = leftnum + rm.updateResourcesCache(r) + go rm.releaseNoReadyInstance(r.resourceBlockKey, cleannum, caller) +} + func (rm *resourceManager) freshDeployingStatus(resourceID, user string, ready int, terminated bool) { rm.lockResource(resourceID) defer rm.unlockResource(resourceID) @@ -528,14 +534,15 @@ func (rm *resourceManager) freshDeployingStatus(resourceID, user string, ready i } if terminated { - go rm.releaseNoReadyInstance(r.resourceBlockKey, r.noReadyInstance, resourceID) - r.noReadyInstance = 0 + if r.noReadyInstance > 0 { + rm.updateNoReadyInfo(r, r.noReadyInstance, 0, resourceID) + } } else { // the newest no ready num = resource request instance - the newest ready num currentNoReady := r.requestInstance - ready if r.noReadyInstance > currentNoReady && currentNoReady >= 0 { - go rm.releaseNoReadyInstance(r.resourceBlockKey, r.noReadyInstance-currentNoReady, resourceID) - r.noReadyInstance = currentNoReady + cleannum := r.noReadyInstance - currentNoReady + rm.updateNoReadyInfo(r, cleannum, currentNoReady, resourceID) } } @@ -935,7 +942,9 @@ func (rm *resourceManager) realLaunch( blog.Errorf("crm: launch service with resource(%s) for user(%s) failed: %v", resourceID, user, err) // if launch failed, clean the dirty data in noReadyInstance - go rm.releaseNoReadyInstance(r.resourceBlockKey, r.noReadyInstance, resourceID) + if r.noReadyInstance > 0 { + rm.updateNoReadyInfo(r, r.noReadyInstance, 0, resourceID) + } return err } } @@ -1023,7 +1032,10 @@ func (rm *resourceManager) scale(resourceID, user string, function op.InstanceFi resourceID, r.requestInstance, targetInstance, user, err) // if scale failed, clean the dirty data in noReadyInstance - go rm.releaseNoReadyInstance(r.resourceBlockKey, r.noReadyInstance, resourceID) + if r.noReadyInstance > 0 { + rm.updateNoReadyInfo(r, r.noReadyInstance, 0, resourceID) + } + return err } @@ -1035,7 +1047,9 @@ func (rm *resourceManager) scale(resourceID, user string, function op.InstanceFi blog.Errorf("crm: try scaling service, save resource(%s) for user(%s) failed: %v", resourceID, user, err) // if save resource failed, clean the dirty data in noReadyInstance - go rm.releaseNoReadyInstance(r.resourceBlockKey, r.noReadyInstance, resourceID) + if r.noReadyInstance > 0 { + rm.updateNoReadyInfo(r, r.noReadyInstance, 0, resourceID) + } return err } @@ -1087,8 +1101,7 @@ func (rm *resourceManager) release(resourceID, user string) error { } if r.noReadyInstance > 0 { - go rm.releaseNoReadyInstance(r.resourceBlockKey, r.noReadyInstance, resourceID) - r.noReadyInstance = 0 + rm.updateNoReadyInfo(r, r.noReadyInstance, 0, resourceID) } r.status = resourceStatusReleased if err = rm.saveResources(r); err != nil {