Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev tming p2p #259

Merged
merged 6 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 33 additions & 7 deletions src/backend/booster/bk_dist/common/longtcp/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,11 @@ func (s *Session) copyMessages() []*Message {
ret := make([]*Message, len(s.sendQueue), len(s.sendQueue))
copy(ret, s.sendQueue)
blog.Debugf("[longtcp] copied %d messages", len(ret))
ids := []string{}
for _, msg := range s.sendQueue {
ids = append(ids, msg.Desc())
}
blog.Infof("[longtcp] [trace message] %v copied to real send queue", ids)
s.sendQueue = make([]*Message, 0, 10)
return ret
} else {
Expand All @@ -302,7 +307,9 @@ func (s *Session) putWait(msg *Message) error {
if s.waitMap != nil {
msg.WaitStart = time.Now()
s.waitMap[msg.TCPHead.UniqID] = msg
blog.Infof("[longtcp] [trace message] [%s] put to wait queue", msg.Desc())
} else {
blog.Warnf("[longtcp] [trace message] [%s] session invalid when put to wait queue", msg.Desc())
return ErrorConnectionInvalid
}

Expand Down Expand Up @@ -330,6 +337,7 @@ func (s *Session) returnWait(ret *MessageResult) error {
if m, ok := s.waitMap[ret.TCPHead.UniqID]; ok {
m.RetChan <- ret
delete(s.waitMap, ret.TCPHead.UniqID)
blog.Infof("[longtcp] [trace message] [%s] notified with response data ", ret.TCPHead.UniqID)
return nil
}
}
Expand All @@ -348,8 +356,8 @@ func (s *Session) uniqid() uint64 {
return id
}

func formatID(id uint64) MessageID {
data := fmt.Sprintf("UNIQID%26x", id)
func formatID(id uint64, localport int32) MessageID {
data := fmt.Sprintf("UNIQID_%011d_%013d", localport, id)
return MessageID(data)
}

Expand All @@ -370,7 +378,7 @@ func (s *Session) encData2Message(

return &Message{
TCPHead: &LongTCPHead{
UniqID: formatID(s.uniqid()),
UniqID: formatID(s.uniqid(), s.client.LocalPort()),
DataLen: totallen,
},
Data: data,
Expand Down Expand Up @@ -402,6 +410,7 @@ func (s *Session) onMessageError(m *Message, err error) {
s.removeWait(m)
}

blog.Warnf("[longtcp] [trace message] [%s] notified with error:%v", m.Desc(), err)
m.RetChan <- &MessageResult{
Err: err,
Data: nil,
Expand All @@ -420,6 +429,7 @@ func (s *Session) sendReal() {
if m.F != nil {
m.F()
}
blog.Warnf("[longtcp] [trace message] [%s] notified with error:%v", m.Desc(), err)
m.RetChan <- &MessageResult{
Err: err,
Data: nil,
Expand Down Expand Up @@ -482,6 +492,7 @@ func (s *Session) sendReal() {
blog.Debugf("[longtcp] session[%s] real sent body with ID [%s] ", s.Desc(), m.Desc())

if !m.WaitResponse {
blog.Infof("[longtcp] [trace message] [%s] notified after sent", m.Desc())
m.RetChan <- &MessageResult{
Err: nil,
Data: nil,
Expand Down Expand Up @@ -533,7 +544,8 @@ func (s *Session) receiveRoutine(wg *sync.WaitGroup) {
return
}

blog.Debugf("[longtcp] session[%s] received %d data", s.Desc(), recvlen)
// blog.Debugf("[longtcp] session[%s] received %d data", s.Desc(), recvlen)
blog.Infof("[longtcp] [trace message] [%s] received response data", head.UniqID)
// TODO : decode msg, and call funtions to deal, and return response
ret := &MessageResult{
Err: nil,
Expand Down Expand Up @@ -573,6 +585,7 @@ func (s *Session) notifyAndWait(msg *Message) *MessageResult {
s.sendMutex.Lock()

if !s.valid {
blog.Warnf("[longtcp] [trace message] [%s] session invalid when append to send queue", msg.Desc())
s.sendMutex.Unlock()
return &MessageResult{
Err: ErrorConnectionInvalid,
Expand All @@ -581,6 +594,8 @@ func (s *Session) notifyAndWait(msg *Message) *MessageResult {
}

s.sendQueue = append(s.sendQueue, msg)
blog.Infof("[longtcp] [trace message] [%s] appended to send queue", msg.Desc())

s.sendMutex.Unlock()

blog.Debugf("[longtcp] notify by chan now, total %d messages now", len(s.sendQueue))
Expand All @@ -598,6 +613,7 @@ func (s *Session) Send(
waitsecs int32,
f OnSendDoneFunc) *MessageResult {
if !s.valid {
blog.Warnf("[longtcp] [trace message] connection invalid when send")
return &MessageResult{
Err: ErrorConnectionInvalid,
Data: nil,
Expand All @@ -619,6 +635,7 @@ func (s *Session) Send(
// 如果 waitresponse为true,则需要等待返回的结果
func (s *Session) SendWithID(id MessageID, data [][]byte, waitresponse bool) *MessageResult {
if !s.valid {
blog.Warnf("[longtcp] [trace message] [%s] connection invalid when send", id)
return &MessageResult{
Err: ErrorConnectionInvalid,
Data: nil,
Expand Down Expand Up @@ -707,19 +724,24 @@ func (s *Session) check(wg *sync.WaitGroup) {

// 清理资源,包括关闭连接,停止协程等
func (s *Session) Clean(err error) {
blog.Debugf("[longtcp] session %s clean now", s.Desc())
// blog.Debugf("[longtcp] session %s clean now", s.Desc())
blog.Infof("[longtcp] [trace message] [session:%s] ready clean now", s.Desc())

s.cancel()

// 通知发送队列中的任务
s.sendMutex.Lock()
blog.Infof("[longtcp] [trace message] [session:%s] has %d in send queue when clean",
s.Desc(), len(s.sendQueue))
if !s.valid { // 避免重复clean
blog.Debugf("[longtcp] session %s has cleaned before", s.Desc())
return
} else {
s.valid = false
}
for _, m := range s.sendQueue {
blog.Warnf("[longtcp] [trace message] [session:%s] [%s] notified in send queue with error:%v",
s.Desc(), m.Desc(), err)
m.RetChan <- &MessageResult{
Err: err,
Data: nil,
Expand All @@ -730,7 +752,11 @@ func (s *Session) Clean(err error) {

// 通知等待结果的队列中的任务
s.waitMutex.Lock()
blog.Infof("[longtcp] [trace message] [session:%s] has %d in wait queue when clean",
s.Desc(), len(s.waitMap))
for _, m := range s.waitMap {
blog.Warnf("[longtcp] [trace message] [session:%s] [%s] notified in wait queue with error:%v",
s.Desc(), m.Desc(), err)
m.RetChan <- &MessageResult{
Err: err,
Data: nil,
Expand All @@ -751,9 +777,9 @@ func (s *Session) checkWaitTimeout() {
for _, m := range s.waitMap {
if m.MaxWaitSecs > 0 &&
m.WaitStart.Add(time.Duration(m.MaxWaitSecs)*time.Second).Before(time.Now()) {
blog.Infof("[longtcp] session %s found message with id:[%v] timeout with %d seconds",
blog.Infof("[longtcp] [trace message] [%s] found message in session:[%s] timeout with %d seconds",
m.Desc(),
s.Desc(),
m.TCPHead.UniqID,
m.MaxWaitSecs)
delete(s.waitMap, m.TCPHead.UniqID)

Expand Down
33 changes: 29 additions & 4 deletions src/backend/booster/bk_dist/common/longtcp/tcpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const (
type TCPClient struct {
timeout int
conn *net.TCPConn

desc string
localport int32
}

// NewTCPClient return new TCPClient
Expand All @@ -44,7 +47,8 @@ func NewTCPClient(timeout int) *TCPClient {
}

return &TCPClient{
timeout: timeout,
timeout: timeout,
localport: -1,
}
}

Expand All @@ -57,8 +61,9 @@ func NewTCPClientWithConn(conn *net.TCPConn) *TCPClient {
}

return &TCPClient{
conn: conn,
timeout: DefaultLongTCPTimeoutSeconds,
conn: conn,
timeout: DefaultLongTCPTimeoutSeconds,
localport: -1,
}
}

Expand Down Expand Up @@ -356,5 +361,25 @@ func (c *TCPClient) ConnDesc() string {
return ""
}

return fmt.Sprintf("%s->%s", c.conn.LocalAddr().String(), c.conn.RemoteAddr().String())
if c.desc != "" {
return c.desc
}

c.desc = fmt.Sprintf("%s->%s", c.conn.LocalAddr().String(), c.conn.RemoteAddr().String())
return c.desc
}

func (c *TCPClient) LocalPort() int32 {
if c.localport != -1 {
return c.localport
}

localAddr, ok := c.conn.LocalAddr().(*net.TCPAddr)
if !ok {
c.localport = 0
} else {
c.localport = int32(localAddr.Port)
}

return c.localport
}
3 changes: 2 additions & 1 deletion src/backend/booster/bk_dist/common/pump/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ func IsPump(env *env.Sandbox) bool {
}

func SupportPump(env *env.Sandbox) bool {
return IsPump(env) && (runtime.GOOS == "windows" || runtime.GOOS == "darwin")
return IsPump(env)
// return IsPump(env) && (runtime.GOOS == "windows" || runtime.GOOS == "darwin")
// return IsPump(env) && runtime.GOOS == "windows"
}

Expand Down
31 changes: 22 additions & 9 deletions src/backend/booster/server/pkg/resource/crm/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,12 @@ func (rm *resourceManager) isFinishDeploying(resourceID, user string) bool {
}
}

func (rm *resourceManager) updateNoReadyInfo(r *resource, cleannum int, leftnum int, caller string) {
r.noReadyInstance = leftnum
rm.updateResourcesCache(r)
go rm.releaseNoReadyInstance(r.resourceBlockKey, cleannum, caller)
}

func (rm *resourceManager) freshDeployingStatus(resourceID, user string, ready int, terminated bool) {
rm.lockResource(resourceID)
defer rm.unlockResource(resourceID)
Expand All @@ -528,14 +534,15 @@ func (rm *resourceManager) freshDeployingStatus(resourceID, user string, ready i
}

if terminated {
go rm.releaseNoReadyInstance(r.resourceBlockKey, r.noReadyInstance, resourceID)
r.noReadyInstance = 0
if r.noReadyInstance > 0 {
rm.updateNoReadyInfo(r, r.noReadyInstance, 0, resourceID)
}
} else {
// the newest no ready num = resource request instance - the newest ready num
currentNoReady := r.requestInstance - ready
if r.noReadyInstance > currentNoReady && currentNoReady >= 0 {
go rm.releaseNoReadyInstance(r.resourceBlockKey, r.noReadyInstance-currentNoReady, resourceID)
r.noReadyInstance = currentNoReady
cleannum := r.noReadyInstance - currentNoReady
rm.updateNoReadyInfo(r, cleannum, currentNoReady, resourceID)
}
}

Expand Down Expand Up @@ -935,7 +942,9 @@ func (rm *resourceManager) realLaunch(
blog.Errorf("crm: launch service with resource(%s) for user(%s) failed: %v", resourceID, user, err)

// if launch failed, clean the dirty data in noReadyInstance
go rm.releaseNoReadyInstance(r.resourceBlockKey, r.noReadyInstance, resourceID)
if r.noReadyInstance > 0 {
rm.updateNoReadyInfo(r, r.noReadyInstance, 0, resourceID)
}
return err
}
}
Expand Down Expand Up @@ -1023,7 +1032,10 @@ func (rm *resourceManager) scale(resourceID, user string, function op.InstanceFi
resourceID, r.requestInstance, targetInstance, user, err)

// if scale failed, clean the dirty data in noReadyInstance
go rm.releaseNoReadyInstance(r.resourceBlockKey, r.noReadyInstance, resourceID)
if r.noReadyInstance > 0 {
rm.updateNoReadyInfo(r, r.noReadyInstance, 0, resourceID)
}

return err
}

Expand All @@ -1035,7 +1047,9 @@ func (rm *resourceManager) scale(resourceID, user string, function op.InstanceFi
blog.Errorf("crm: try scaling service, save resource(%s) for user(%s) failed: %v", resourceID, user, err)

// if save resource failed, clean the dirty data in noReadyInstance
go rm.releaseNoReadyInstance(r.resourceBlockKey, r.noReadyInstance, resourceID)
if r.noReadyInstance > 0 {
rm.updateNoReadyInfo(r, r.noReadyInstance, 0, resourceID)
}
return err
}

Expand Down Expand Up @@ -1087,8 +1101,7 @@ func (rm *resourceManager) release(resourceID, user string) error {
}

if r.noReadyInstance > 0 {
go rm.releaseNoReadyInstance(r.resourceBlockKey, r.noReadyInstance, resourceID)
r.noReadyInstance = 0
rm.updateNoReadyInfo(r, r.noReadyInstance, 0, resourceID)
}
r.status = resourceStatusReleased
if err = rm.saveResources(r); err != nil {
Expand Down
Loading