Skip to content

Commit

Permalink
release 2.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
snowinszu committed Apr 1, 2021
2 parents a5d5690 + 0082e83 commit 65d7c20
Show file tree
Hide file tree
Showing 17 changed files with 698 additions and 263 deletions.
1 change: 1 addition & 0 deletions admin.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ function start()
{
chmod +x $FILE_NAME

sudo sysctl -w net.core.somaxconn=65535
sudo sysctl -w fs.file-max=6000000
sudo sysctl -w fs.nr_open=6000000
ulimit -n 6000000
Expand Down
27 changes: 15 additions & 12 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ func (c *Client)SendMsgClose(reason string) error {
log.Error("json.Marshal", err)
return err
}
return c.SendMessage(b)
err, _ = c.SendMessage(b)
return err
}

func (c *Client)SendMsgVersion(version int) error {
Expand All @@ -91,18 +92,19 @@ func (c *Client)SendMsgVersion(version int) error {
log.Error("json.Marshal", err)
return err
}
return c.SendMessage(b)
err, _ = c.SendMessage(b)
return err
}

func (c *Client)SendMessage(msg []byte) error {
func (c *Client)SendMessage(msg []byte) (error, bool) {
return c.sendData(msg, false)
}

func (c *Client)SendBinaryData(data []byte) error {
func (c *Client)SendBinaryData(data []byte) (error, bool) {
return c.sendData(data, true)
}

func (c *Client)sendData(data []byte, binary bool) error {
func (c *Client)sendData(data []byte, binary bool) (error, bool) {
var opCode ws.OpCode
if binary {
opCode = ws.OpBinary
Expand All @@ -115,16 +117,16 @@ func (c *Client)sendData(data []byte, binary bool) error {
err := wsutil.WriteServerMessage(c.Conn, opCode, data)
if err != nil {
// handle error
log.Infof("WriteServerMessage " + err.Error())
return err
log.Warnf("WriteServerMessage " + err.Error())
return err, true
}
} else {
// 非本地节点
//log.Warnf("send signal to remote node %s to peer %s", c.RpcNodeAddr, c.PeerId)
node, ok := rpcservice.GetNode(c.RpcNodeAddr)
if ok {
if !node.IsAlive() {
return fmt.Errorf("node %s is not alive when send signal", node.Addr())
return fmt.Errorf("node %s is not alive when send signal", node.Addr()), true
}
req := rpcservice.SignalReq{
ToPeerId: c.PeerId,
Expand All @@ -134,19 +136,20 @@ func (c *Client)sendData(data []byte, binary bool) error {
err := node.SendMsgSignal(req, &resp)
if err != nil {
//log.Warnf("SendMsgSignal to remote failed " + err.Error())
//log.Warnf("req %+v", req)
// 节点出现问题
//node.DialNode()
return err
//node.dialWithChannel()
return err, true
}
if !resp.Success {
//log.Warnf("SendMsgSignal failed reason " + resp.Reason)
return fmt.Errorf(resp.Reason)
return fmt.Errorf(resp.Reason), false
}
} else {
log.Warnf("node %s not found", c.RpcNodeAddr)
}
}
return nil
return nil, false
}

func (c *Client)Close() error {
Expand Down
28 changes: 19 additions & 9 deletions handler/signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,32 @@ func (s *SignalHandler)Handle() {
//h := hub.GetInstance()
//log.Infof("load client Msg %v", s.Msg)
//判断节点是否还在线
if target, ok := hub.GetClient(s.Msg.ToPeerId); ok {
if target, ok := hub.GetClient(s.Msg.ToPeerId); ok && !s.Cli.HasNotFoundPeer(s.Msg.ToPeerId) {
//log.Infof("found client %s", s.Msg.ToPeerId)
if s.Cli.PeerId == "" {
log.Warnf("PeerId is not valid")
return
}
resp := SignalResp{
Action: "signal",
FromPeerId: s.Cli.PeerId,
Data: s.Msg.Data,
}
if err := hub.SendJsonToClient(target, resp); err != nil {
log.Warnf("Send signal to peer %s error %s", target.PeerId, err)
hub.RemoveClient(target.PeerId)
s.Cli.EnqueueNotFoundPeer(target.PeerId)
notFounResp := SignalResp{
Action: "signal",
FromPeerId: s.Msg.ToPeerId,
if err, fatal := hub.SendJsonToClient(target, resp); err != nil {
peerType := "local"
if !target.LocalNode {
peerType = "remote"
}
log.Warnf("%s send signal to %s peer %s error %s", s.Cli.PeerId, peerType, target.PeerId, err)
if !fatal {
//hub.RemoveClient(target.PeerId)
s.Cli.EnqueueNotFoundPeer(target.PeerId)
notFounResp := SignalResp{
Action: "signal",
FromPeerId: target.PeerId,
}
hub.SendJsonToClient(s.Cli, notFounResp)
}
hub.SendJsonToClient(s.Cli, notFounResp)
}
//if !target.(*client.Client).LocalNode {
// log.Warnf("send signal msg from %s to %s on node %s", s.Cli.PeerId, s.Msg.ToPeerId, target.(*client.Client).RpcNodeAddr)
Expand Down
19 changes: 7 additions & 12 deletions hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func DoRegister(client *client.Client) {
log.Infof("hub DoRegister %s", client.PeerId)
if client.PeerId != "" {
h.Clients.Set(client.PeerId, client)
} else {
panic("DoRegister")
}
}

Expand All @@ -56,11 +58,7 @@ func DoRegisterRemoteClient(peerId string, addr string) {
}

func GetClient(id string) (*client.Client, bool) {
cli, ok := h.Clients.Get(id)
if !ok {
return nil, false
}
return cli.(*client.Client), true
return h.Clients.Get(id)
}

func RemoveClient(id string) {
Expand All @@ -80,11 +78,11 @@ func DoUnregister(peerId string) bool {
}

// send json object to a client with peerId
func SendJsonToClient(target *client.Client, value interface{}) error {
func SendJsonToClient(target *client.Client, value interface{}) (error, bool) {
b, err := json.Marshal(value)
if err != nil {
log.Error("json.Marshal", err)
return err
return err, false
}
//if target == nil {
// //log.Printf("sendJsonToClient error")
Expand All @@ -101,12 +99,9 @@ func SendJsonToClient(target *client.Client, value interface{}) error {


} else {
if err := target.SendMessage(b); err != nil {
//log.Warnf("sendMessage", err)
return err
}
return target.SendMessage(b)
}
return nil
return nil, false
}

func ClearAll() {
Expand Down
5 changes: 3 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/spf13/viper"
"net"
"net/http"
_ "net/http/pprof"
//_ "net/http/pprof"
"net/rpc"
"os"
"os/signal"
Expand Down Expand Up @@ -152,7 +152,7 @@ func init() {
log.Warnf("start check client alive...")
count := 0
for item := range hub.GetInstance().Clients.IterBuffered() {
cli := item.Val.(*client.Client)
cli := item.Val
if cli.LocalNode && cli.IsExpired(now, EXPIRE_LIMIT) {
// 节点过期
//log.Warnf("client %s is expired for %d, close it", cli.PeerId, now-cli.Timestamp)
Expand Down Expand Up @@ -279,6 +279,7 @@ func main() {

<-intrChan

broadcastClient.GetNodeHub().Clear()
log.Info("Shutting down server...")
}

Expand Down
19 changes: 19 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"strconv"
"strings"
"testing"
"time"
)

func TestUtilizationRate(t *testing.T) {
Expand All @@ -22,6 +23,24 @@ func TestVersion(t *testing.T) {
t.Logf("%d", a*10 + b)
}

func TestTime(t *testing.T) {
s := 2 * time.Second.Nanoseconds()
start := time.Now()
time.Sleep(300*time.Microsecond)
t.Logf("%d %d", time.Since(start).Nanoseconds(), s)
}

func TestFnv32(t *testing.T) {
key := "ddffvfgfgf"
hash := uint32(2166136261)
const prime32 = uint32(16777619)
for i := 0; i < len(key); i++ {
hash *= prime32
hash ^= uint32(key[i])
}
t.Log(hash)
}




4 changes: 4 additions & 0 deletions rpcservice/broadcast/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,7 @@ func (c *Client) BroadcastMsgLeave(id string) {
}
}

func (c *Client) GetNodeHub() *rpcservice.NodeHub {
return c.nodeHub
}

2 changes: 1 addition & 1 deletion rpcservice/broadcast/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ func (b *Service) Leave(request rpcservice.JoinLeaveReq, reply *rpcservice.RpcRe
}

func (h *Service) Pong(request rpcservice.Ping, reply *rpcservice.Pong) error {
//log.Warnf("receive ping from ",)
//time.Sleep(1 * time.Second)
return nil
}
5 changes: 2 additions & 3 deletions rpcservice/heartbeat/client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package heartbeat

import (
"cbsignal/client"
"cbsignal/hub"
"cbsignal/rpcservice"
"github.com/lexkong/log"
Expand Down Expand Up @@ -51,6 +50,7 @@ func (h *Client) NodeHub() *rpcservice.NodeHub {
return h.nodeHub
}

// 连接master并向master获取节点
func (h *Client) DialHeartbeatService() {
if h.masterAddr == "" {
panic("masterAddr is nil")
Expand Down Expand Up @@ -147,8 +147,7 @@ func deletePeersInNode(addr string) {
//})

for item := range hub.GetInstance().Clients.IterBuffered() {
val := item.Val
cli := val.(*client.Client)
cli := item.Val
if cli.RpcNodeAddr == addr {
log.Infof("delete cli %s in deleted node %s", cli.PeerId, addr)
hub.DoUnregister(cli.PeerId)
Expand Down
7 changes: 3 additions & 4 deletions rpcservice/heartbeat/service.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package heartbeat

import (
"cbsignal/client"
"cbsignal/hub"
"cbsignal/rpcservice"
"github.com/lexkong/log"
Expand All @@ -11,10 +10,9 @@ import (

const (
CHECK_INTERVAL = 25
EXPIRE_TOMEOUT = 21
EXPIRE_TOMEOUT = 15
)


type PongResp struct {
Nodes []string
}
Expand Down Expand Up @@ -52,6 +50,7 @@ func RegisterHeartbeatService() error {
}
}
}()
//log.Warnf("do RegisterName(HEARTBEAT_SERVICE, s)")
return rpc.RegisterName(HEARTBEAT_SERVICE, s)
}

Expand Down Expand Up @@ -85,7 +84,7 @@ func (h *Service)Peers(request GetPeersReq, reply *PeersResp) error {
// return true
//})
for item := range hub.GetInstance().Clients.IterBuffered() {
cli := item.Val.(*client.Client)
cli := item.Val
peer := Peer{
PeerId: cli.PeerId,
RpcNodeAddr: cli.RpcNodeAddr,
Expand Down
Loading

0 comments on commit 65d7c20

Please sign in to comment.