Skip to content

Commit

Permalink
u
Browse files Browse the repository at this point in the history
  • Loading branch information
snowinszu committed Mar 28, 2021
1 parent 7ed2d04 commit a5d5690
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 20 deletions.
7 changes: 3 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,13 @@ func (c *Client)sendData(data []byte, binary bool) error {
var resp rpcservice.RpcResp
err := node.SendMsgSignal(req, &resp)
if err != nil {
log.Warnf("SendMsgSignal to remote failed " + err.Error())
//log.Warnf("SendMsgSignal to remote failed " + err.Error())
// 节点出现问题
node.DialNode()
//node.DialNode()
return err
}
if !resp.Success {
// TODO 过滤
log.Warnf("SendMsgSignal failed reason " + resp.Reason)
//log.Warnf("SendMsgSignal failed reason " + resp.Reason)
return fmt.Errorf(resp.Reason)
}
} else {
Expand Down
1 change: 0 additions & 1 deletion config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: 2.2.0

log:
writers: file # 输出位置,有两个可选项 —— file 和 stdout。选择 file 会将日志记录到 logger_file 指定的日志文件中,选择 stdout 会将日志输出到标准输出,当然也可以两者同时选择
Expand Down
1 change: 0 additions & 1 deletion config_cluster.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: 2.2.0

log:
writers: file # 输出位置,有两个可选项 —— file 和 stdout。选择 file 会将日志记录到 logger_file 指定的日志文件中,选择 stdout 会将日志输出到标准输出,当然也可以两者同时选择
Expand Down
18 changes: 9 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
)

const (
VERSION = "2.2.0"
CHECK_CLIENT_INTERVAL = 15 * 60
EXPIRE_LIMIT = 15 * 60
)
Expand All @@ -55,7 +56,6 @@ var (
signalCertPath string
signalKeyPath string

version string
versionNum int
compressionEnabled bool
compressionLevel int
Expand All @@ -76,6 +76,8 @@ var (
func init() {
pflag.Parse()

versionNum = util.GetVersionNum(VERSION)

// Initialize viper
if *cfg != "" {
viper.SetConfigFile(*cfg) // 如果指定了配置文件,则解析指定的配置文件
Expand Down Expand Up @@ -262,10 +264,10 @@ func main() {
http.HandleFunc("/wss", wsHandler)
http.HandleFunc("/", wsHandler)
http.HandleFunc("/count", handler.CountHandler())
http.HandleFunc("/version", handler.VersionHandler(version))
http.HandleFunc("/version", handler.VersionHandler(VERSION))

info := handler.SignalInfo{
Version: version,
Version: VERSION,
CompressionEnabled: compressionEnabled,
SecurityEnabled: securityEnabled,
ClusterMode: isCluster,
Expand Down Expand Up @@ -347,11 +349,11 @@ func wsHandler(w http.ResponseWriter, r *http.Request) {
}

hub.DoRegister(c)
// 发送版本号
c.SendMsgVersion(versionNum)
if isCluster {
broadcastClient.BroadcastMsgJoin(id)
go broadcastClient.BroadcastMsgJoin(id)
}
// 发送版本号
c.SendMsgVersion(versionNum)

go func() {
defer func() {
Expand All @@ -362,7 +364,7 @@ func wsHandler(w http.ResponseWriter, r *http.Request) {
closeInvalidConn(c)
}
if isCluster {
broadcastClient.BroadcastMsgLeave(id)
go broadcastClient.BroadcastMsgLeave(id)
}
}()
msg := make([]wsutil.Message, 0, 4)
Expand Down Expand Up @@ -403,8 +405,6 @@ func closeInvalidConn(cli *client.Client) {
}

func setupConfigFromViper() {
version = viper.GetString("version")
versionNum = util.GetVersionNum(version)
compressionEnabled = viper.GetBool("compression.enable")
compressionLevel = viper.GetInt("compression.level")
compressionActivationRatio = viper.GetInt("compression.activationRatio")
Expand Down
1 change: 1 addition & 0 deletions rpcservice/broadcast/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ func (b *Service) Leave(request rpcservice.JoinLeaveReq, reply *rpcservice.RpcRe
}

func (h *Service) Pong(request rpcservice.Ping, reply *rpcservice.Pong) error {
//log.Warnf("receive ping from ",)
return nil
}
2 changes: 1 addition & 1 deletion rpcservice/heartbeat/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func RegisterHeartbeatService() error {

func (h *Service) Pong(request PingReq, reply *PongResp) error {
addr := request.Addr
//log.Infof("receive ping from %s", addr)
log.Infof("receive ping from %s", addr)
p, ok := h.Nodes[addr]
if ok {
p.UpdateTs()
Expand Down
44 changes: 40 additions & 4 deletions rpcservice/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
DIAL_MAX_ATTENTS = 2
ATTENTS_INTERVAL = 2 // second
PING_INTERVAL = 5
READ_TIMEOUT = 1500 * time.Millisecond
)

type JoinLeaveReq struct {
Expand Down Expand Up @@ -116,23 +117,58 @@ func (s *Node) SendMsgLeave(request JoinLeaveReq, reply *RpcResp) error {
}
log.Infof("SendMsgLeave to %s", s.addr)
request.Addr = s.addr
return s.Client.Call(BROADCAST_SERVICE+LEAVE, request, reply)
return s.sendInternal(BROADCAST_SERVICE+LEAVE, request, reply)
}

func (s *Node) SendMsgSignal(request SignalReq, reply *RpcResp) error {
if !s.isAlive {
return errors.New(fmt.Sprintf("node %s is not alive", s.addr))
}
//log.Infof("SendMsgSignal to %s", s.addr)
return s.Client.Call(SIGNAL_SERVICE+SIGNAL, request, reply)
return s.sendInternal(SIGNAL_SERVICE+SIGNAL, request, reply)
}

func (s *Node) SendMsgPing(request Ping, reply *Pong) error {
if !s.isAlive {
return errors.New(fmt.Sprintf("node %s is not alive", s.addr))
}
//log.Infof("SendMsgPing to %s", s.addr)
return s.Client.Call(BROADCAST_SERVICE+PONG, request, reply)
log.Infof("SendMsgPing to %s", s.addr)
return s.sendInternal(BROADCAST_SERVICE+PONG, request, reply)
}

func (s *Node) sendInternal(method string, args interface{}, reply interface{}) error {
//start := time.Now()
//done := make(chan error, 1)

done := make(chan *rpc.Call, 1)



//client.Go(method, args, reply, done)

s.Client.Go(method, args, reply, done)
//return call.Error

select {
case <-time.After(READ_TIMEOUT):
//log.Warnf("rpc call timeout %s", method)
//s.Client.Close()
return fmt.Errorf("rpc call timeout %s", method)
case err := <-done:
//elapsed := time.Since(start)
//log.Warnf("6666 %d %d", elapsed.Nanoseconds(), PRINT_WARN_LIMIT_NANO)
//if elapsed.Nanoseconds() >= PRINT_WARN_LIMIT_NANO {
// log.Warnf("rpc send %s cost %v", method, elapsed)
//}

if err.Error != nil {
//rpcClient.Close()
return err.Error
}

}

return nil
}

func (s *Node) StartHeartbeat() {
Expand Down

0 comments on commit a5d5690

Please sign in to comment.