Skip to content

Commit

Permalink
cmap optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
snowinszu committed Mar 30, 2021
1 parent 4331beb commit 0082e83
Show file tree
Hide file tree
Showing 15 changed files with 583 additions and 246 deletions.
1 change: 1 addition & 0 deletions admin.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ function start()
{
chmod +x $FILE_NAME

sudo sysctl -w net.core.somaxconn=65535
sudo sysctl -w fs.file-max=6000000
sudo sysctl -w fs.nr_open=6000000
ulimit -n 6000000
Expand Down
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (c *Client)sendData(data []byte, binary bool) (error, bool) {
err := wsutil.WriteServerMessage(c.Conn, opCode, data)
if err != nil {
// handle error
log.Infof("WriteServerMessage " + err.Error())
log.Warnf("WriteServerMessage " + err.Error())
return err, true
}
} else {
Expand Down
6 changes: 5 additions & 1 deletion handler/signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ func (s *SignalHandler)Handle() {
Data: s.Msg.Data,
}
if err, fatal := hub.SendJsonToClient(target, resp); err != nil {
log.Warnf("%s send signal to peer %s error %s", s.Cli.PeerId, target.PeerId, err)
peerType := "local"
if !target.LocalNode {
peerType = "remote"
}
log.Warnf("%s send signal to %s peer %s error %s", s.Cli.PeerId, peerType, target.PeerId, err)
if !fatal {
//hub.RemoveClient(target.PeerId)
s.Cli.EnqueueNotFoundPeer(target.PeerId)
Expand Down
6 changes: 1 addition & 5 deletions hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,7 @@ func DoRegisterRemoteClient(peerId string, addr string) {
}

func GetClient(id string) (*client.Client, bool) {
cli, ok := h.Clients.Get(id)
if !ok {
return nil, false
}
return cli.(*client.Client), true
return h.Clients.Get(id)
}

func RemoveClient(id string) {
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/spf13/viper"
"net"
"net/http"
_ "net/http/pprof"
//_ "net/http/pprof"
"net/rpc"
"os"
"os/signal"
Expand Down Expand Up @@ -152,7 +152,7 @@ func init() {
log.Warnf("start check client alive...")
count := 0
for item := range hub.GetInstance().Clients.IterBuffered() {
cli := item.Val.(*client.Client)
cli := item.Val
if cli.LocalNode && cli.IsExpired(now, EXPIRE_LIMIT) {
// 节点过期
//log.Warnf("client %s is expired for %d, close it", cli.PeerId, now-cli.Timestamp)
Expand Down
10 changes: 10 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,17 @@ func TestTime(t *testing.T) {
start := time.Now()
time.Sleep(300*time.Microsecond)
t.Logf("%d %d", time.Since(start).Nanoseconds(), s)
}

func TestFnv32(t *testing.T) {
key := "ddffvfgfgf"
hash := uint32(2166136261)
const prime32 = uint32(16777619)
for i := 0; i < len(key); i++ {
hash *= prime32
hash ^= uint32(key[i])
}
t.Log(hash)
}


Expand Down
4 changes: 1 addition & 3 deletions rpcservice/heartbeat/client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package heartbeat

import (
"cbsignal/client"
"cbsignal/hub"
"cbsignal/rpcservice"
"github.com/lexkong/log"
Expand Down Expand Up @@ -148,8 +147,7 @@ func deletePeersInNode(addr string) {
//})

for item := range hub.GetInstance().Clients.IterBuffered() {
val := item.Val
cli := val.(*client.Client)
cli := item.Val
if cli.RpcNodeAddr == addr {
log.Infof("delete cli %s in deleted node %s", cli.PeerId, addr)
hub.DoUnregister(cli.PeerId)
Expand Down
5 changes: 2 additions & 3 deletions rpcservice/heartbeat/service.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package heartbeat

import (
"cbsignal/client"
"cbsignal/hub"
"cbsignal/rpcservice"
"github.com/lexkong/log"
Expand All @@ -11,7 +10,7 @@ import (

const (
CHECK_INTERVAL = 25
EXPIRE_TOMEOUT = 21
EXPIRE_TOMEOUT = 15
)

type PongResp struct {
Expand Down Expand Up @@ -85,7 +84,7 @@ func (h *Service)Peers(request GetPeersReq, reply *PeersResp) error {
// return true
//})
for item := range hub.GetInstance().Clients.IterBuffered() {
cli := item.Val.(*client.Client)
cli := item.Val
peer := Peer{
PeerId: cli.PeerId,
RpcNodeAddr: cli.RpcNodeAddr,
Expand Down
149 changes: 74 additions & 75 deletions rpcservice/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ const (
PING_INTERVAL = 5
DIAL_TIMEOUT = 3 // second
READ_TIMEOUT = 1500 * time.Millisecond
PRINT_WARN_LIMIT_NANO = 100 * 1000000
PRINT_WARN_LIMIT_NANO = 100 * time.Millisecond
POOL_MIN_CONNS = 5
POOL_MAX_CONNS = 50
POOL_MAX_CONNS = 32
)


Expand All @@ -50,36 +50,66 @@ type Ping struct {
type Pong struct {
}

//type Conn struct {
// *rpc.Client
//}

//func (c *Conn)close() error {
// return c.Close()
//}

type Node struct {
sync.Mutex
addr string // ip:port
ts int64
isAlive bool // 是否存活
connPool *pool.GenericPool
connPool pool.Pool
Released bool
}

//func NewNode(addr string) *Node {
// node := Node{
// addr: addr,
// ts: time.Now().Unix(),
// }
// // 创建连接池
// p, err := pool.NewGenericPool(POOL_MIN_CONNS, POOL_MAX_CONNS, time.Minute*10, func() (*rpc.Client, error) {
// c, err := rpc.Dial("tcp", addr)
// if err != nil {
//
// return nil, err
// }
// return c, nil
// })
// if err != nil {
// panic(err)
// }
// node.connPool = p
// return &node
//}

func NewNode(addr string) *Node {
node := Node{
addr: addr,
ts: time.Now().Unix(),
}
// 创建连接池
p, err := pool.NewGenericPool(POOL_MIN_CONNS, POOL_MAX_CONNS, time.Minute*10, func() (*rpc.Client, error) {

//factory 创建连接的方法
factory := func() (*rpc.Client, error) {
c, err := rpc.Dial("tcp", addr)
if err != nil {

return nil, err
}
return c, nil
})
}

//close 关闭连接的方法
closer := func(v *rpc.Client) error { return v.Close() }

poolConfig := &pool.Config{
InitialCap: POOL_MIN_CONNS, //资源池初始连接数
MaxIdle: POOL_MAX_CONNS, //最大空闲连接数
MaxCap: POOL_MAX_CONNS,//最大并发连接数
Factory: factory,
Close: closer,
//连接最大空闲时间,超过该时间的连接 将会关闭,可避免空闲时连接EOF,自动失效的问题
IdleTimeout: 60 * time.Second,
}
p, err := pool.NewChannelPool(poolConfig)

if err != nil {
panic(err)
}
Expand All @@ -94,32 +124,6 @@ func (s *Node)DialNode() error {
return nil
}

func (s *Node)dialWithChannel(client *rpc.Client) error {
attemts := 0
s.Lock()
s.isAlive = false
s.Unlock()
for {
c, err := rpc.Dial("tcp", s.addr)
if err != nil {
if attemts >= DIAL_MAX_ATTENTS {
return err
}
attemts++
log.Errorf(err, "dialWithChannel")
time.Sleep(ATTENTS_INTERVAL * time.Second)
continue
}
client = c
break
}
log.Warnf("Dial node %s succeed", s.addr)
s.Lock()
s.isAlive = true
s.Unlock()
return nil
}

func (s *Node) UpdateTs() {
s.ts = time.Now().Unix()
}
Expand All @@ -141,13 +145,8 @@ func (s *Node) SendMsgJoin(request JoinLeaveReq, reply *RpcResp) error {
return errors.New(fmt.Sprintf("node %s is not alive", s.addr))
}
log.Infof("SendMsgJoin to %s", s.addr)
client, err := s.connPool.Acquire()
if err != nil {
return err
}
err = s.sendInternal(BROADCAST_SERVICE+JOIN, request, reply, client)
s.connPool.Release(client)
return err

return s.sendMsg(BROADCAST_SERVICE+JOIN, request, reply)
}

func (s *Node) SendMsgLeave(request JoinLeaveReq, reply *RpcResp) error {
Expand All @@ -156,45 +155,40 @@ func (s *Node) SendMsgLeave(request JoinLeaveReq, reply *RpcResp) error {
}
log.Infof("SendMsgLeave to %s", s.addr)
request.Addr = s.addr
client, err := s.connPool.Acquire()
if err != nil {
return err
}
err = s.sendInternal(BROADCAST_SERVICE+LEAVE, request, reply, client)
s.connPool.Release(client)
return err
return s.sendMsg(BROADCAST_SERVICE+LEAVE, request, reply)
}

func (s *Node) SendMsgSignal(request SignalReq, reply *RpcResp) error {
if !s.isAlive {
return errors.New(fmt.Sprintf("node %s is not alive", s.addr))
}
//log.Infof("SendMsgSignal to %s", s.addr)
client, err := s.connPool.Acquire()
if err != nil {
return err
}
err = s.sendInternal(SIGNAL_SERVICE+SIGNAL, request, reply, client)
s.connPool.Release(client)
return err

return s.sendMsg(SIGNAL_SERVICE+SIGNAL, request, reply)

}

func (s *Node) SendMsgPing(request Ping, reply *Pong) error {
if !s.isAlive {
return errors.New(fmt.Sprintf("node %s is not alive", s.addr))
}
//log.Infof("SendMsgPing to %s", s.addr)
return s.sendMsg(BROADCAST_SERVICE+PONG, request, reply)
}

func (s *Node) sendMsg(method string, request interface{}, reply interface{}) error {
client, err := s.connPool.Acquire()

if err != nil {
return err
}
err = s.sendInternal(BROADCAST_SERVICE+PONG, request, reply, client)
s.connPool.Release(client)
err = s.sendInternal(method, request, reply, client)
if err != nil {
s.connPool.Close(client)
} else {
s.connPool.Release(client)
}
return err
}

func (s *Node) sendInternal(method string, args interface{}, reply interface{}, client *rpc.Client ) error {
func (s *Node) sendInternal(method string, request interface{}, reply interface{}, client *rpc.Client) error {
//start := time.Now()
//done := make(chan error, 1)

Expand All @@ -209,8 +203,8 @@ func (s *Node) sendInternal(method string, args interface{}, reply interface{},
// done <- err
//}()

client.Go(method, args, reply, done)

client.Go(method, request, reply, done)
//s.connPool.Release(client)
//s.Client.Go(method, args, reply, done)
//return call.Error

Expand All @@ -220,9 +214,10 @@ func (s *Node) sendInternal(method string, args interface{}, reply interface{},
//s.Client.Close()
return fmt.Errorf("rpc call timeout %s", method)
case call := <-done:
//.Add(timeout).Before(time.Now())
//elapsed := time.Since(start)
//log.Warnf("6666 %d %d", elapsed.Nanoseconds(), PRINT_WARN_LIMIT_NANO)
//if elapsed.Nanoseconds() >= PRINT_WARN_LIMIT_NANO {
//if start.Add(PRINT_WARN_LIMIT_NANO).Before(time.Now()) {
// log.Warnf("rpc send %s cost %v", method, elapsed)
//}

Expand All @@ -242,18 +237,22 @@ func (s *Node) sendInternal(method string, args interface{}, reply interface{},
func (s *Node) StartHeartbeat() {
go func() {
for {
log.Warnf("GenericPool now conn %d idle %d", s.connPool.NumTotalConn(), s.connPool.NumIdleConn())
if s.Released {
//log.Warnf("%s s.Released", s.addr)
break
}
log.Warnf("ConnPool %s conn %d idle %d", s.addr, s.connPool.NumTotalConn(), s.connPool.NumIdleConn())
ping := Ping{}
var pong Pong
if err := s.SendMsgPing(ping, &pong); err != nil {
log.Errorf(err, "node heartbeat")
s.Lock()
s.isAlive = false
s.Unlock()
if err := s.DialNode(); err != nil {
log.Errorf(err, "dial node")
break
}
} else {
s.Lock()
s.isAlive = true
s.Unlock()
}
time.Sleep(PING_INTERVAL * time.Second)
}
Expand Down
4 changes: 4 additions & 0 deletions rpcservice/nodehub.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func GetNode(addr string) (*Node, bool) {
func (n *NodeHub) Delete(addr string) {
log.Warnf("NodeHub delete %s", addr)
n.mu.Lock()
if node, ok := n.nodes[addr]; ok {
node.Released = true
node.connPool.Shutdown()
}
delete(n.nodes, addr)
n.mu.Unlock()
}
Expand Down
Loading

0 comments on commit 0082e83

Please sign in to comment.