Skip to content

Commit

Permalink
Merge pull request #259 from tbs60/dev_tming_p2p
Browse files Browse the repository at this point in the history
Dev tming p2p
  • Loading branch information
tming authored Jun 26, 2024
2 parents 30b27ed + 2e2aad9 commit 4f8ee29
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 21 deletions.
40 changes: 33 additions & 7 deletions src/backend/booster/bk_dist/common/longtcp/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,11 @@ func (s *Session) copyMessages() []*Message {
ret := make([]*Message, len(s.sendQueue), len(s.sendQueue))
copy(ret, s.sendQueue)
blog.Debugf("[longtcp] copied %d messages", len(ret))
ids := []string{}
for _, msg := range s.sendQueue {
ids = append(ids, msg.Desc())
}
blog.Infof("[longtcp] [trace message] %v copied to real send queue", ids)
s.sendQueue = make([]*Message, 0, 10)
return ret
} else {
Expand All @@ -302,7 +307,9 @@ func (s *Session) putWait(msg *Message) error {
if s.waitMap != nil {
msg.WaitStart = time.Now()
s.waitMap[msg.TCPHead.UniqID] = msg
blog.Infof("[longtcp] [trace message] [%s] put to wait queue", msg.Desc())
} else {
blog.Warnf("[longtcp] [trace message] [%s] session invalid when put to wait queue", msg.Desc())
return ErrorConnectionInvalid
}

Expand Down Expand Up @@ -330,6 +337,7 @@ func (s *Session) returnWait(ret *MessageResult) error {
if m, ok := s.waitMap[ret.TCPHead.UniqID]; ok {
m.RetChan <- ret
delete(s.waitMap, ret.TCPHead.UniqID)
blog.Infof("[longtcp] [trace message] [%s] notified with response data ", ret.TCPHead.UniqID)
return nil
}
}
Expand All @@ -348,8 +356,8 @@ func (s *Session) uniqid() uint64 {
return id
}

func formatID(id uint64) MessageID {
data := fmt.Sprintf("UNIQID%26x", id)
func formatID(id uint64, localport int32) MessageID {
data := fmt.Sprintf("UNIQID_%011d_%013d", localport, id)
return MessageID(data)
}

Expand All @@ -370,7 +378,7 @@ func (s *Session) encData2Message(

return &Message{
TCPHead: &LongTCPHead{
UniqID: formatID(s.uniqid()),
UniqID: formatID(s.uniqid(), s.client.LocalPort()),
DataLen: totallen,
},
Data: data,
Expand Down Expand Up @@ -402,6 +410,7 @@ func (s *Session) onMessageError(m *Message, err error) {
s.removeWait(m)
}

blog.Warnf("[longtcp] [trace message] [%s] notified with error:%v", m.Desc(), err)
m.RetChan <- &MessageResult{
Err: err,
Data: nil,
Expand All @@ -420,6 +429,7 @@ func (s *Session) sendReal() {
if m.F != nil {
m.F()
}
blog.Warnf("[longtcp] [trace message] [%s] notified with error:%v", m.Desc(), err)
m.RetChan <- &MessageResult{
Err: err,
Data: nil,
Expand Down Expand Up @@ -482,6 +492,7 @@ func (s *Session) sendReal() {
blog.Debugf("[longtcp] session[%s] real sent body with ID [%s] ", s.Desc(), m.Desc())

if !m.WaitResponse {
blog.Infof("[longtcp] [trace message] [%s] notified after sent", m.Desc())
m.RetChan <- &MessageResult{
Err: nil,
Data: nil,
Expand Down Expand Up @@ -533,7 +544,8 @@ func (s *Session) receiveRoutine(wg *sync.WaitGroup) {
return
}

blog.Debugf("[longtcp] session[%s] received %d data", s.Desc(), recvlen)
// blog.Debugf("[longtcp] session[%s] received %d data", s.Desc(), recvlen)
blog.Infof("[longtcp] [trace message] [%s] received response data", head.UniqID)
// TODO : decode msg, and call funtions to deal, and return response
ret := &MessageResult{
Err: nil,
Expand Down Expand Up @@ -573,6 +585,7 @@ func (s *Session) notifyAndWait(msg *Message) *MessageResult {
s.sendMutex.Lock()

if !s.valid {
blog.Warnf("[longtcp] [trace message] [%s] session invalid when append to send queue", msg.Desc())
s.sendMutex.Unlock()
return &MessageResult{
Err: ErrorConnectionInvalid,
Expand All @@ -581,6 +594,8 @@ func (s *Session) notifyAndWait(msg *Message) *MessageResult {
}

s.sendQueue = append(s.sendQueue, msg)
blog.Infof("[longtcp] [trace message] [%s] appended to send queue", msg.Desc())

s.sendMutex.Unlock()

blog.Debugf("[longtcp] notify by chan now, total %d messages now", len(s.sendQueue))
Expand All @@ -598,6 +613,7 @@ func (s *Session) Send(
waitsecs int32,
f OnSendDoneFunc) *MessageResult {
if !s.valid {
blog.Warnf("[longtcp] [trace message] connection invalid when send")
return &MessageResult{
Err: ErrorConnectionInvalid,
Data: nil,
Expand All @@ -619,6 +635,7 @@ func (s *Session) Send(
// 如果 waitresponse为true,则需要等待返回的结果
func (s *Session) SendWithID(id MessageID, data [][]byte, waitresponse bool) *MessageResult {
if !s.valid {
blog.Warnf("[longtcp] [trace message] [%s] connection invalid when send", id)
return &MessageResult{
Err: ErrorConnectionInvalid,
Data: nil,
Expand Down Expand Up @@ -707,19 +724,24 @@ func (s *Session) check(wg *sync.WaitGroup) {

// 清理资源,包括关闭连接,停止协程等
func (s *Session) Clean(err error) {
blog.Debugf("[longtcp] session %s clean now", s.Desc())
// blog.Debugf("[longtcp] session %s clean now", s.Desc())
blog.Infof("[longtcp] [trace message] [session:%s] ready clean now", s.Desc())

s.cancel()

// 通知发送队列中的任务
s.sendMutex.Lock()
blog.Infof("[longtcp] [trace message] [session:%s] has %d in send queue when clean",
s.Desc(), len(s.sendQueue))
if !s.valid { // 避免重复clean
blog.Debugf("[longtcp] session %s has cleaned before", s.Desc())
return
} else {
s.valid = false
}
for _, m := range s.sendQueue {
blog.Warnf("[longtcp] [trace message] [session:%s] [%s] notified in send queue with error:%v",
s.Desc(), m.Desc(), err)
m.RetChan <- &MessageResult{
Err: err,
Data: nil,
Expand All @@ -730,7 +752,11 @@ func (s *Session) Clean(err error) {

// 通知等待结果的队列中的任务
s.waitMutex.Lock()
blog.Infof("[longtcp] [trace message] [session:%s] has %d in wait queue when clean",
s.Desc(), len(s.waitMap))
for _, m := range s.waitMap {
blog.Warnf("[longtcp] [trace message] [session:%s] [%s] notified in wait queue with error:%v",
s.Desc(), m.Desc(), err)
m.RetChan <- &MessageResult{
Err: err,
Data: nil,
Expand All @@ -751,9 +777,9 @@ func (s *Session) checkWaitTimeout() {
for _, m := range s.waitMap {
if m.MaxWaitSecs > 0 &&
m.WaitStart.Add(time.Duration(m.MaxWaitSecs)*time.Second).Before(time.Now()) {
blog.Infof("[longtcp] session %s found message with id:[%v] timeout with %d seconds",
blog.Infof("[longtcp] [trace message] [%s] found message in session:[%s] timeout with %d seconds",
m.Desc(),
s.Desc(),
m.TCPHead.UniqID,
m.MaxWaitSecs)
delete(s.waitMap, m.TCPHead.UniqID)

Expand Down
33 changes: 29 additions & 4 deletions src/backend/booster/bk_dist/common/longtcp/tcpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const (
type TCPClient struct {
timeout int
conn *net.TCPConn

desc string
localport int32
}

// NewTCPClient return new TCPClient
Expand All @@ -44,7 +47,8 @@ func NewTCPClient(timeout int) *TCPClient {
}

return &TCPClient{
timeout: timeout,
timeout: timeout,
localport: -1,
}
}

Expand All @@ -57,8 +61,9 @@ func NewTCPClientWithConn(conn *net.TCPConn) *TCPClient {
}

return &TCPClient{
conn: conn,
timeout: DefaultLongTCPTimeoutSeconds,
conn: conn,
timeout: DefaultLongTCPTimeoutSeconds,
localport: -1,
}
}

Expand Down Expand Up @@ -356,5 +361,25 @@ func (c *TCPClient) ConnDesc() string {
return ""
}

return fmt.Sprintf("%s->%s", c.conn.LocalAddr().String(), c.conn.RemoteAddr().String())
if c.desc != "" {
return c.desc
}

c.desc = fmt.Sprintf("%s->%s", c.conn.LocalAddr().String(), c.conn.RemoteAddr().String())
return c.desc
}

func (c *TCPClient) LocalPort() int32 {
if c.localport != -1 {
return c.localport
}

localAddr, ok := c.conn.LocalAddr().(*net.TCPAddr)
if !ok {
c.localport = 0
} else {
c.localport = int32(localAddr.Port)
}

return c.localport
}
3 changes: 2 additions & 1 deletion src/backend/booster/bk_dist/common/pump/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ func IsPump(env *env.Sandbox) bool {
}

func SupportPump(env *env.Sandbox) bool {
return IsPump(env) && (runtime.GOOS == "windows" || runtime.GOOS == "darwin")
return IsPump(env)
// return IsPump(env) && (runtime.GOOS == "windows" || runtime.GOOS == "darwin")
// return IsPump(env) && runtime.GOOS == "windows"
}

Expand Down
31 changes: 22 additions & 9 deletions src/backend/booster/server/pkg/resource/crm/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,12 @@ func (rm *resourceManager) isFinishDeploying(resourceID, user string) bool {
}
}

func (rm *resourceManager) updateNoReadyInfo(r *resource, cleannum int, leftnum int, caller string) {
r.noReadyInstance = leftnum
rm.updateResourcesCache(r)
go rm.releaseNoReadyInstance(r.resourceBlockKey, cleannum, caller)
}

func (rm *resourceManager) freshDeployingStatus(resourceID, user string, ready int, terminated bool) {
rm.lockResource(resourceID)
defer rm.unlockResource(resourceID)
Expand All @@ -528,14 +534,15 @@ func (rm *resourceManager) freshDeployingStatus(resourceID, user string, ready i
}

if terminated {
go rm.releaseNoReadyInstance(r.resourceBlockKey, r.noReadyInstance, resourceID)
r.noReadyInstance = 0
if r.noReadyInstance > 0 {
rm.updateNoReadyInfo(r, r.noReadyInstance, 0, resourceID)
}
} else {
// the newest no ready num = resource request instance - the newest ready num
currentNoReady := r.requestInstance - ready
if r.noReadyInstance > currentNoReady && currentNoReady >= 0 {
go rm.releaseNoReadyInstance(r.resourceBlockKey, r.noReadyInstance-currentNoReady, resourceID)
r.noReadyInstance = currentNoReady
cleannum := r.noReadyInstance - currentNoReady
rm.updateNoReadyInfo(r, cleannum, currentNoReady, resourceID)
}
}

Expand Down Expand Up @@ -935,7 +942,9 @@ func (rm *resourceManager) realLaunch(
blog.Errorf("crm: launch service with resource(%s) for user(%s) failed: %v", resourceID, user, err)

// if launch failed, clean the dirty data in noReadyInstance
go rm.releaseNoReadyInstance(r.resourceBlockKey, r.noReadyInstance, resourceID)
if r.noReadyInstance > 0 {
rm.updateNoReadyInfo(r, r.noReadyInstance, 0, resourceID)
}
return err
}
}
Expand Down Expand Up @@ -1023,7 +1032,10 @@ func (rm *resourceManager) scale(resourceID, user string, function op.InstanceFi
resourceID, r.requestInstance, targetInstance, user, err)

// if scale failed, clean the dirty data in noReadyInstance
go rm.releaseNoReadyInstance(r.resourceBlockKey, r.noReadyInstance, resourceID)
if r.noReadyInstance > 0 {
rm.updateNoReadyInfo(r, r.noReadyInstance, 0, resourceID)
}

return err
}

Expand All @@ -1035,7 +1047,9 @@ func (rm *resourceManager) scale(resourceID, user string, function op.InstanceFi
blog.Errorf("crm: try scaling service, save resource(%s) for user(%s) failed: %v", resourceID, user, err)

// if save resource failed, clean the dirty data in noReadyInstance
go rm.releaseNoReadyInstance(r.resourceBlockKey, r.noReadyInstance, resourceID)
if r.noReadyInstance > 0 {
rm.updateNoReadyInfo(r, r.noReadyInstance, 0, resourceID)
}
return err
}

Expand Down Expand Up @@ -1087,8 +1101,7 @@ func (rm *resourceManager) release(resourceID, user string) error {
}

if r.noReadyInstance > 0 {
go rm.releaseNoReadyInstance(r.resourceBlockKey, r.noReadyInstance, resourceID)
r.noReadyInstance = 0
rm.updateNoReadyInfo(r, r.noReadyInstance, 0, resourceID)
}
r.status = resourceStatusReleased
if err = rm.saveResources(r); err != nil {
Expand Down

0 comments on commit 4f8ee29

Please sign in to comment.