diff --git a/admin.sh b/admin.sh index dd3a7b1..2c72c91 100644 --- a/admin.sh +++ b/admin.sh @@ -63,6 +63,7 @@ function start() { chmod +x $FILE_NAME + sudo sysctl -w net.core.somaxconn=65535 sudo sysctl -w fs.file-max=6000000 sudo sysctl -w fs.nr_open=6000000 ulimit -n 6000000 diff --git a/client/client.go b/client/client.go index 776f3dc..679037e 100644 --- a/client/client.go +++ b/client/client.go @@ -78,7 +78,8 @@ func (c *Client)SendMsgClose(reason string) error { log.Error("json.Marshal", err) return err } - return c.SendMessage(b) + err, _ = c.SendMessage(b) + return err } func (c *Client)SendMsgVersion(version int) error { @@ -91,18 +92,19 @@ func (c *Client)SendMsgVersion(version int) error { log.Error("json.Marshal", err) return err } - return c.SendMessage(b) + err, _ = c.SendMessage(b) + return err } -func (c *Client)SendMessage(msg []byte) error { +func (c *Client)SendMessage(msg []byte) (error, bool) { return c.sendData(msg, false) } -func (c *Client)SendBinaryData(data []byte) error { +func (c *Client)SendBinaryData(data []byte) (error, bool) { return c.sendData(data, true) } -func (c *Client)sendData(data []byte, binary bool) error { +func (c *Client)sendData(data []byte, binary bool) (error, bool) { var opCode ws.OpCode if binary { opCode = ws.OpBinary @@ -115,8 +117,8 @@ func (c *Client)sendData(data []byte, binary bool) error { err := wsutil.WriteServerMessage(c.Conn, opCode, data) if err != nil { // handle error - log.Infof("WriteServerMessage " + err.Error()) - return err + log.Warnf("WriteServerMessage " + err.Error()) + return err, true } } else { // 非本地节点 @@ -124,7 +126,7 @@ func (c *Client)sendData(data []byte, binary bool) error { node, ok := rpcservice.GetNode(c.RpcNodeAddr) if ok { if !node.IsAlive() { - return fmt.Errorf("node %s is not alive when send signal", node.Addr()) + return fmt.Errorf("node %s is not alive when send signal", node.Addr()), true } req := rpcservice.SignalReq{ ToPeerId: c.PeerId, @@ -134,19 +136,20 @@ func (c *Client)sendData(data []byte, binary bool) error { err := node.SendMsgSignal(req, &resp) if err != nil { //log.Warnf("SendMsgSignal to remote failed " + err.Error()) + //log.Warnf("req %+v", req) // 节点出现问题 - //node.DialNode() - return err + //node.dialWithChannel() + return err, true } if !resp.Success { //log.Warnf("SendMsgSignal failed reason " + resp.Reason) - return fmt.Errorf(resp.Reason) + return fmt.Errorf(resp.Reason), false } } else { log.Warnf("node %s not found", c.RpcNodeAddr) } } - return nil + return nil, false } func (c *Client)Close() error { diff --git a/handler/signal.go b/handler/signal.go index 11cc7a4..d3af1f4 100644 --- a/handler/signal.go +++ b/handler/signal.go @@ -15,22 +15,32 @@ func (s *SignalHandler)Handle() { //h := hub.GetInstance() //log.Infof("load client Msg %v", s.Msg) //判断节点是否还在线 - if target, ok := hub.GetClient(s.Msg.ToPeerId); ok { + if target, ok := hub.GetClient(s.Msg.ToPeerId); ok && !s.Cli.HasNotFoundPeer(s.Msg.ToPeerId) { //log.Infof("found client %s", s.Msg.ToPeerId) + if s.Cli.PeerId == "" { + log.Warnf("PeerId is not valid") + return + } resp := SignalResp{ Action: "signal", FromPeerId: s.Cli.PeerId, Data: s.Msg.Data, } - if err := hub.SendJsonToClient(target, resp); err != nil { - log.Warnf("Send signal to peer %s error %s", target.PeerId, err) - hub.RemoveClient(target.PeerId) - s.Cli.EnqueueNotFoundPeer(target.PeerId) - notFounResp := SignalResp{ - Action: "signal", - FromPeerId: s.Msg.ToPeerId, + if err, fatal := hub.SendJsonToClient(target, resp); err != nil { + peerType := "local" + if !target.LocalNode { + peerType = "remote" + } + log.Warnf("%s send signal to %s peer %s error %s", s.Cli.PeerId, peerType, target.PeerId, err) + if !fatal { + //hub.RemoveClient(target.PeerId) + s.Cli.EnqueueNotFoundPeer(target.PeerId) + notFounResp := SignalResp{ + Action: "signal", + FromPeerId: target.PeerId, + } + hub.SendJsonToClient(s.Cli, notFounResp) } - hub.SendJsonToClient(s.Cli, notFounResp) } //if !target.(*client.Client).LocalNode { // log.Warnf("send signal msg from %s to %s on node %s", s.Cli.PeerId, s.Msg.ToPeerId, target.(*client.Client).RpcNodeAddr) diff --git a/hub/hub.go b/hub/hub.go index 6930760..47a848e 100644 --- a/hub/hub.go +++ b/hub/hub.go @@ -43,6 +43,8 @@ func DoRegister(client *client.Client) { log.Infof("hub DoRegister %s", client.PeerId) if client.PeerId != "" { h.Clients.Set(client.PeerId, client) + } else { + panic("DoRegister") } } @@ -56,11 +58,7 @@ func DoRegisterRemoteClient(peerId string, addr string) { } func GetClient(id string) (*client.Client, bool) { - cli, ok := h.Clients.Get(id) - if !ok { - return nil, false - } - return cli.(*client.Client), true + return h.Clients.Get(id) } func RemoveClient(id string) { @@ -80,11 +78,11 @@ func DoUnregister(peerId string) bool { } // send json object to a client with peerId -func SendJsonToClient(target *client.Client, value interface{}) error { +func SendJsonToClient(target *client.Client, value interface{}) (error, bool) { b, err := json.Marshal(value) if err != nil { log.Error("json.Marshal", err) - return err + return err, false } //if target == nil { // //log.Printf("sendJsonToClient error") @@ -101,12 +99,9 @@ func SendJsonToClient(target *client.Client, value interface{}) error { } else { - if err := target.SendMessage(b); err != nil { - //log.Warnf("sendMessage", err) - return err - } + return target.SendMessage(b) } - return nil + return nil, false } func ClearAll() { diff --git a/main.go b/main.go index 7e6f0fa..d61f855 100644 --- a/main.go +++ b/main.go @@ -22,7 +22,7 @@ import ( "github.com/spf13/viper" "net" "net/http" - _ "net/http/pprof" + //_ "net/http/pprof" "net/rpc" "os" "os/signal" @@ -152,7 +152,7 @@ func init() { log.Warnf("start check client alive...") count := 0 for item := range hub.GetInstance().Clients.IterBuffered() { - cli := item.Val.(*client.Client) + cli := item.Val if cli.LocalNode && cli.IsExpired(now, EXPIRE_LIMIT) { // 节点过期 //log.Warnf("client %s is expired for %d, close it", cli.PeerId, now-cli.Timestamp) @@ -279,6 +279,7 @@ func main() { <-intrChan + broadcastClient.GetNodeHub().Clear() log.Info("Shutting down server...") } diff --git a/main_test.go b/main_test.go index 05b9890..7a342b2 100644 --- a/main_test.go +++ b/main_test.go @@ -4,6 +4,7 @@ import ( "strconv" "strings" "testing" + "time" ) func TestUtilizationRate(t *testing.T) { @@ -22,6 +23,24 @@ func TestVersion(t *testing.T) { t.Logf("%d", a*10 + b) } +func TestTime(t *testing.T) { + s := 2 * time.Second.Nanoseconds() + start := time.Now() + time.Sleep(300*time.Microsecond) + t.Logf("%d %d", time.Since(start).Nanoseconds(), s) +} + +func TestFnv32(t *testing.T) { + key := "ddffvfgfgf" + hash := uint32(2166136261) + const prime32 = uint32(16777619) + for i := 0; i < len(key); i++ { + hash *= prime32 + hash ^= uint32(key[i]) + } + t.Log(hash) +} + diff --git a/rpcservice/broadcast/client.go b/rpcservice/broadcast/client.go index c690115..7685d65 100644 --- a/rpcservice/broadcast/client.go +++ b/rpcservice/broadcast/client.go @@ -52,3 +52,7 @@ func (c *Client) BroadcastMsgLeave(id string) { } } +func (c *Client) GetNodeHub() *rpcservice.NodeHub { + return c.nodeHub +} + diff --git a/rpcservice/broadcast/service.go b/rpcservice/broadcast/service.go index 9ce78a3..2a277a7 100644 --- a/rpcservice/broadcast/service.go +++ b/rpcservice/broadcast/service.go @@ -32,6 +32,6 @@ func (b *Service) Leave(request rpcservice.JoinLeaveReq, reply *rpcservice.RpcRe } func (h *Service) Pong(request rpcservice.Ping, reply *rpcservice.Pong) error { - //log.Warnf("receive ping from ",) + //time.Sleep(1 * time.Second) return nil } diff --git a/rpcservice/heartbeat/client.go b/rpcservice/heartbeat/client.go index c476df0..8916488 100644 --- a/rpcservice/heartbeat/client.go +++ b/rpcservice/heartbeat/client.go @@ -1,7 +1,6 @@ package heartbeat import ( - "cbsignal/client" "cbsignal/hub" "cbsignal/rpcservice" "github.com/lexkong/log" @@ -51,6 +50,7 @@ func (h *Client) NodeHub() *rpcservice.NodeHub { return h.nodeHub } +// 连接master并向master获取节点 func (h *Client) DialHeartbeatService() { if h.masterAddr == "" { panic("masterAddr is nil") @@ -147,8 +147,7 @@ func deletePeersInNode(addr string) { //}) for item := range hub.GetInstance().Clients.IterBuffered() { - val := item.Val - cli := val.(*client.Client) + cli := item.Val if cli.RpcNodeAddr == addr { log.Infof("delete cli %s in deleted node %s", cli.PeerId, addr) hub.DoUnregister(cli.PeerId) diff --git a/rpcservice/heartbeat/service.go b/rpcservice/heartbeat/service.go index 3d73e53..9d60db1 100644 --- a/rpcservice/heartbeat/service.go +++ b/rpcservice/heartbeat/service.go @@ -1,7 +1,6 @@ package heartbeat import ( - "cbsignal/client" "cbsignal/hub" "cbsignal/rpcservice" "github.com/lexkong/log" @@ -11,10 +10,9 @@ import ( const ( CHECK_INTERVAL = 25 - EXPIRE_TOMEOUT = 21 + EXPIRE_TOMEOUT = 15 ) - type PongResp struct { Nodes []string } @@ -52,6 +50,7 @@ func RegisterHeartbeatService() error { } } }() + //log.Warnf("do RegisterName(HEARTBEAT_SERVICE, s)") return rpc.RegisterName(HEARTBEAT_SERVICE, s) } @@ -85,7 +84,7 @@ func (h *Service)Peers(request GetPeersReq, reply *PeersResp) error { // return true //}) for item := range hub.GetInstance().Clients.IterBuffered() { - cli := item.Val.(*client.Client) + cli := item.Val peer := Peer{ PeerId: cli.PeerId, RpcNodeAddr: cli.RpcNodeAddr, diff --git a/rpcservice/node.go b/rpcservice/node.go index 8f89a9d..04a5a94 100644 --- a/rpcservice/node.go +++ b/rpcservice/node.go @@ -1,6 +1,7 @@ package rpcservice import ( + "cbsignal/rpcservice/pool" "errors" "fmt" "github.com/lexkong/log" @@ -17,19 +18,25 @@ const ( SIGNAL_SERVICE = "SignalService" SIGNAL = ".Signal" DIAL_MAX_ATTENTS = 2 - ATTENTS_INTERVAL = 2 // second + ATTENTS_INTERVAL = 2 // second PING_INTERVAL = 5 + DIAL_TIMEOUT = 3 // second READ_TIMEOUT = 1500 * time.Millisecond + PRINT_WARN_LIMIT_NANO = 100 * time.Millisecond + POOL_MIN_CONNS = 5 + POOL_MAX_CONNS = 32 ) + + type JoinLeaveReq struct { - PeerId string // 节点id + PeerId string // 节点id Addr string } type RpcResp struct { Success bool - Reason string + Reason string } type SignalReq struct { @@ -38,49 +45,79 @@ type SignalReq struct { } type Ping struct { - } type Pong struct { - } type Node struct { sync.Mutex - *rpc.Client - addr string // ip:port - ts int64 - isAlive bool // 是否存活 + addr string // ip:port + ts int64 + isAlive bool // 是否存活 + connPool pool.Pool + Released bool } +//func NewNode(addr string) *Node { +// node := Node{ +// addr: addr, +// ts: time.Now().Unix(), +// } +// // 创建连接池 +// p, err := pool.NewGenericPool(POOL_MIN_CONNS, POOL_MAX_CONNS, time.Minute*10, func() (*rpc.Client, error) { +// c, err := rpc.Dial("tcp", addr) +// if err != nil { +// +// return nil, err +// } +// return c, nil +// }) +// if err != nil { +// panic(err) +// } +// node.connPool = p +// return &node +//} + func NewNode(addr string) *Node { node := Node{ addr: addr, - ts: time.Now().Unix(), + ts: time.Now().Unix(), } - return &node -} -func (s *Node) DialNode() error { - attemts := 0 - s.Lock() - s.isAlive = false - s.Unlock() - for { - c, err := rpc.Dial("tcp", s.addr) + //factory 创建连接的方法 + factory := func() (*rpc.Client, error) { + c, err := rpc.Dial("tcp", addr) if err != nil { - if attemts >= DIAL_MAX_ATTENTS { - return err - } - attemts ++ - log.Errorf(err, "DialNode") - time.Sleep(ATTENTS_INTERVAL*time.Second) - continue + + return nil, err } - s.Client = c - break + return c, nil } - log.Warnf("Dial node %s succeed", s.addr) + + //close 关闭连接的方法 + closer := func(v *rpc.Client) error { return v.Close() } + + poolConfig := &pool.Config{ + InitialCap: POOL_MIN_CONNS, //资源池初始连接数 + MaxIdle: POOL_MAX_CONNS, //最大空闲连接数 + MaxCap: POOL_MAX_CONNS,//最大并发连接数 + Factory: factory, + Close: closer, + //连接最大空闲时间,超过该时间的连接 将会关闭,可避免空闲时连接EOF,自动失效的问题 + IdleTimeout: 60 * time.Second, + } + p, err := pool.NewChannelPool(poolConfig) + + if err != nil { + panic(err) + } + node.connPool = p + return &node +} + +func (s *Node)DialNode() error { s.Lock() s.isAlive = true s.Unlock() @@ -108,7 +145,8 @@ func (s *Node) SendMsgJoin(request JoinLeaveReq, reply *RpcResp) error { return errors.New(fmt.Sprintf("node %s is not alive", s.addr)) } log.Infof("SendMsgJoin to %s", s.addr) - return s.Client.Call(BROADCAST_SERVICE+JOIN, request, reply) + + return s.sendMsg(BROADCAST_SERVICE+JOIN, request, reply) } func (s *Node) SendMsgLeave(request JoinLeaveReq, reply *RpcResp) error { @@ -117,7 +155,7 @@ func (s *Node) SendMsgLeave(request JoinLeaveReq, reply *RpcResp) error { } log.Infof("SendMsgLeave to %s", s.addr) request.Addr = s.addr - return s.sendInternal(BROADCAST_SERVICE+LEAVE, request, reply) + return s.sendMsg(BROADCAST_SERVICE+LEAVE, request, reply) } func (s *Node) SendMsgSignal(request SignalReq, reply *RpcResp) error { @@ -125,47 +163,72 @@ func (s *Node) SendMsgSignal(request SignalReq, reply *RpcResp) error { return errors.New(fmt.Sprintf("node %s is not alive", s.addr)) } //log.Infof("SendMsgSignal to %s", s.addr) - return s.sendInternal(SIGNAL_SERVICE+SIGNAL, request, reply) + + return s.sendMsg(SIGNAL_SERVICE+SIGNAL, request, reply) + } func (s *Node) SendMsgPing(request Ping, reply *Pong) error { - if !s.isAlive { - return errors.New(fmt.Sprintf("node %s is not alive", s.addr)) + //log.Infof("SendMsgPing to %s", s.addr) + return s.sendMsg(BROADCAST_SERVICE+PONG, request, reply) +} + +func (s *Node) sendMsg(method string, request interface{}, reply interface{}) error { + client, err := s.connPool.Acquire() + + if err != nil { + return err + } + err = s.sendInternal(method, request, reply, client) + if err != nil { + s.connPool.Close(client) + } else { + s.connPool.Release(client) } - log.Infof("SendMsgPing to %s", s.addr) - return s.sendInternal(BROADCAST_SERVICE+PONG, request, reply) + return err } -func (s *Node) sendInternal(method string, args interface{}, reply interface{}) error { +func (s *Node) sendInternal(method string, request interface{}, reply interface{}, client *rpc.Client) error { //start := time.Now() //done := make(chan error, 1) done := make(chan *rpc.Call, 1) + //log.Warnf("GenericPool now conn %d idle %d", s.connPool.NumTotalConn(), s.connPool.NumIdleConn()) + //go func() { + // //log.Warnf("client.Call %s", method) + // err := client.Call(method, args, reply) + // s.connPool.Release(closer) + // done <- err + //}() - //client.Go(method, args, reply, done) - - s.Client.Go(method, args, reply, done) + client.Go(method, request, reply, done) + //s.connPool.Release(client) + //s.Client.Go(method, args, reply, done) //return call.Error select { - case <-time.After(READ_TIMEOUT): - //log.Warnf("rpc call timeout %s", method) - //s.Client.Close() - return fmt.Errorf("rpc call timeout %s", method) - case err := <-done: - //elapsed := time.Since(start) - //log.Warnf("6666 %d %d", elapsed.Nanoseconds(), PRINT_WARN_LIMIT_NANO) - //if elapsed.Nanoseconds() >= PRINT_WARN_LIMIT_NANO { - // log.Warnf("rpc send %s cost %v", method, elapsed) - //} - - if err.Error != nil { - //rpcClient.Close() - return err.Error - } - + case <-time.After(READ_TIMEOUT): + //log.Warnf("rpc call timeout %s", method) + //s.Client.Close() + return fmt.Errorf("rpc call timeout %s", method) + case call := <-done: + //.Add(timeout).Before(time.Now()) + //elapsed := time.Since(start) + //log.Warnf("6666 %d %d", elapsed.Nanoseconds(), PRINT_WARN_LIMIT_NANO) + //if start.Add(PRINT_WARN_LIMIT_NANO).Before(time.Now()) { + // log.Warnf("rpc send %s cost %v", method, elapsed) + //} + + //if err.Error != nil { + // //rpcClient.Close() + // return err.Error + //} + if err := call.Error; err != nil { + //rpcClient.Close() + return err + } } return nil @@ -174,19 +237,24 @@ func (s *Node) sendInternal(method string, args interface{}, reply interface{}) func (s *Node) StartHeartbeat() { go func() { for { + if s.Released { + //log.Warnf("%s s.Released", s.addr) + break + } + log.Warnf("ConnPool %s conn %d idle %d", s.addr, s.connPool.NumTotalConn(), s.connPool.NumIdleConn()) ping := Ping{} var pong Pong - if err := s.SendMsgPing(ping, &pong);err != nil { + if err := s.SendMsgPing(ping, &pong); err != nil { log.Errorf(err, "node heartbeat") s.Lock() s.isAlive = false s.Unlock() - if err := s.DialNode();err != nil { - log.Errorf(err, "dial node") - break - } + } else { + s.Lock() + s.isAlive = true + s.Unlock() } - time.Sleep(PING_INTERVAL*time.Second) + time.Sleep(PING_INTERVAL * time.Second) } }() } @@ -194,4 +262,3 @@ func (s *Node) StartHeartbeat() { func (s *Node) IsAlive() bool { return s.isAlive } - diff --git a/rpcservice/nodehub.go b/rpcservice/nodehub.go index 2f58241..0619faf 100644 --- a/rpcservice/nodehub.go +++ b/rpcservice/nodehub.go @@ -6,7 +6,7 @@ import ( ) type NodeHub struct { - node map[string]*Node + nodes map[string]*Node mu sync.Mutex } @@ -14,7 +14,7 @@ var nodeHub *NodeHub func NewNodeHub() *NodeHub { n := NodeHub{ - node: make(map[string]*Node), + nodes: make(map[string]*Node), } nodeHub = &n return &n @@ -27,31 +27,38 @@ func GetNode(addr string) (*Node, bool) { func (n *NodeHub) Delete(addr string) { log.Warnf("NodeHub delete %s", addr) n.mu.Lock() - delete(n.node, addr) + if node, ok := n.nodes[addr]; ok { + node.Released = true + node.connPool.Shutdown() + } + delete(n.nodes, addr) n.mu.Unlock() } func (n *NodeHub) Add(addr string, peer *Node) { log.Infof("NodeHub add %s", addr) - n.node[addr] = peer + n.nodes[addr] = peer } func (n *NodeHub) Get(addr string) (*Node, bool) { n.mu.Lock() - node, ok := n.node[addr] + node, ok := n.nodes[addr] n.mu.Unlock() return node, ok } func (n *NodeHub) GetAll() map[string]*Node { //log.Infof("NodeHub GetAll %d", len(n.node)) - return n.node + return n.nodes } func (n *NodeHub) Clear() { log.Infof("NodeHub clear") n.mu.Lock() - n.node = make(map[string]*Node) + for _, node := range n.nodes { + node.connPool.Shutdown() + } + n.nodes = make(map[string]*Node) n.mu.Unlock() } diff --git a/rpcservice/pool/channel_pool.go b/rpcservice/pool/channel_pool.go new file mode 100644 index 0000000..96ca49b --- /dev/null +++ b/rpcservice/pool/channel_pool.go @@ -0,0 +1,257 @@ +package pool + +import ( + "errors" + "fmt" + "github.com/lexkong/log" + "net/rpc" + "sync" + "time" + //"reflect" +) + +var ( + //ErrMaxActiveConnReached 连接池超限 + ErrMaxActiveConnReached = errors.New("MaxActiveConnReached") +) + +// Config 连接池相关配置 +type Config struct { + //连接池中拥有的最小连接数 + InitialCap int + //最大并发存活连接数 + MaxCap int + //最大空闲连接 + MaxIdle int + //生成连接的方法 + Factory func() (*rpc.Client, error) + //关闭连接的方法 + Close func(*rpc.Client) error + //检查连接是否有效的方法 + Ping func(*rpc.Client) error + //连接最大空闲时间,超过该事件则将失效 + IdleTimeout time.Duration +} + +type connReq struct { + idleConn *idleConn +} + +// channelPool 存放连接信息 +type channelPool struct { + mu sync.RWMutex + conns chan *idleConn + factory func() (*rpc.Client, error) + close func(*rpc.Client) error + ping func(*rpc.Client) error + idleTimeout, waitTimeOut time.Duration + maxActive int + openingConns int + connReqs []chan connReq +} + +type idleConn struct { + conn *rpc.Client + t time.Time +} + +// NewChannelPool 初始化连接 +func NewChannelPool(poolConfig *Config) (Pool, error) { + if !(poolConfig.InitialCap <= poolConfig.MaxIdle && poolConfig.MaxCap >= poolConfig.MaxIdle && poolConfig.InitialCap >= 0) { + return nil, errors.New("invalid capacity settings") + } + if poolConfig.Factory == nil { + return nil, errors.New("invalid factory func settings") + } + if poolConfig.Close == nil { + return nil, errors.New("invalid close func settings") + } + + c := &channelPool{ + conns: make(chan *idleConn, poolConfig.MaxIdle), + factory: poolConfig.Factory, + close: poolConfig.Close, + idleTimeout: poolConfig.IdleTimeout, + maxActive: poolConfig.MaxCap, + openingConns: poolConfig.InitialCap, + } + + if poolConfig.Ping != nil { + c.ping = poolConfig.Ping + } + + for i := 0; i < poolConfig.InitialCap; i++ { + conn, err := c.factory() + if err != nil { + c.Shutdown() + return nil, fmt.Errorf("factory is not able to fill the pool: %s", err) + } + c.conns <- &idleConn{conn: conn, t: time.Now()} + } + + return c, nil +} + +// getConns 获取所有连接 +func (c *channelPool) getConns() chan *idleConn { + c.mu.Lock() + conns := c.conns + c.mu.Unlock() + return conns +} + +// Get 从pool中取一个连接 +func (c *channelPool) Acquire() (*rpc.Client, error) { + conns := c.getConns() + if conns == nil { + return nil, ErrClosed + } + for { + select { + case wrapConn := <-conns: + if wrapConn == nil { + return nil, ErrClosed + } + //判断是否超时,超时则丢弃 + if timeout := c.idleTimeout; timeout > 0 { + if wrapConn.t.Add(timeout).Before(time.Now()) { + //丢弃并关闭该连接 + c.Close(wrapConn.conn) + continue + } + } + //判断是否失效,失效则丢弃,如果用户没有设定 ping 方法,就不检查 + if c.ping != nil { + if err := c.Ping(wrapConn.conn); err != nil { + c.Close(wrapConn.conn) + continue + } + } + return wrapConn.conn, nil + default: + c.mu.Lock() + log.Debugf("openConn %v %v", c.openingConns, c.maxActive) + if c.openingConns >= c.maxActive { + req := make(chan connReq, 1) + c.connReqs = append(c.connReqs, req) + c.mu.Unlock() + ret, ok := <-req + if !ok { + return nil, ErrMaxActiveConnReached + } + if timeout := c.idleTimeout; timeout > 0 { + if ret.idleConn.t.Add(timeout).Before(time.Now()) { + //丢弃并关闭该连接 + c.Close(ret.idleConn.conn) + continue + } + } + return ret.idleConn.conn, nil + } + if c.factory == nil { + c.mu.Unlock() + return nil, ErrClosed + } + conn, err := c.factory() + if err != nil { + c.mu.Unlock() + return nil, err + } + c.openingConns++ + c.mu.Unlock() + return conn, nil + } + } +} + +// Put 将连接放回pool中 +func (c *channelPool) Release(conn *rpc.Client) error { + if conn == nil { + return errors.New("connection is nil. rejecting") + } + + c.mu.Lock() + + if c.conns == nil { + c.mu.Unlock() + return c.Close(conn) + } + + if l := len(c.connReqs); l > 0 { + req := c.connReqs[0] + copy(c.connReqs, c.connReqs[1:]) + c.connReqs = c.connReqs[:l-1] + req <- connReq{ + idleConn: &idleConn{conn: conn, t: time.Now()}, + } + c.mu.Unlock() + return nil + } else { + select { + case c.conns <- &idleConn{conn: conn, t: time.Now()}: + c.mu.Unlock() + return nil + default: + c.mu.Unlock() + //连接池已满,直接关闭该连接 + return c.Close(conn) + } + } +} + +// Close 关闭单条连接 +func (c *channelPool) Close(conn *rpc.Client) error { + if conn == nil { + return errors.New("connection is nil. rejecting") + } + c.mu.Lock() + defer c.mu.Unlock() + if c.close == nil { + return nil + } + c.openingConns-- + return c.close(conn) +} + +// Ping 检查单条连接是否有效 +func (c *channelPool) Ping(conn *rpc.Client) error { + if conn == nil { + return errors.New("connection is nil. rejecting") + } + return c.ping(conn) +} + +// Release 释放连接池中所有连接 +func (c *channelPool) Shutdown() error { + c.mu.Lock() + conns := c.conns + c.conns = nil + c.factory = nil + c.ping = nil + closeFun := c.close + c.close = nil + c.mu.Unlock() + + if conns == nil { + return nil + } + + close(conns) + for wrapConn := range conns { + //log.Printf("Type %v\n",reflect.TypeOf(wrapConn.conn)) + closeFun(wrapConn.conn) + } + return nil +} + +// Len 连接池中总连接 +func (c *channelPool) NumTotalConn() int { + return c.openingConns +} + +// TODO +func (c *channelPool) NumIdleConn() int { + return len(c.getConns()) +} + + diff --git a/rpcservice/pool/generic_pool.go b/rpcservice/pool/generic_pool.go new file mode 100644 index 0000000..b49d9b0 --- /dev/null +++ b/rpcservice/pool/generic_pool.go @@ -0,0 +1,152 @@ +package pool + +import ( + "errors" + "net/rpc" + "sync" + "time" +) + +var ( + ErrInvalidConfig = errors.New("invalid pool config") + ErrPoolClosed = errors.New("pool closed") +) + +type factory func() (*rpc.Client, error) + + + +type GenericPool struct { + sync.Mutex + pool chan *rpc.Client + maxOpen int // 池中最大资源数 + numOpen int // 当前池中资源数 + minOpen int // 池中最少资源数 + closed bool // 池是否已关闭 + maxLifetime time.Duration + factory factory // 创建连接的方法 +} + +func NewGenericPool(minOpen, maxOpen int, maxLifetime time.Duration, factory factory) (*GenericPool, error) { + if maxOpen <= 0 || minOpen > maxOpen { + return nil, ErrInvalidConfig + } + p := &GenericPool{ + maxOpen: maxOpen, + minOpen: minOpen, + maxLifetime: maxLifetime, + factory: factory, + pool: make(chan *rpc.Client, maxOpen), + } + + for i := 0; i < minOpen; i++ { + closer, err := factory() + if err != nil { + continue + } + p.numOpen++ + p.pool <- closer + } + return p, nil +} + +func (p *GenericPool) Acquire() (*rpc.Client, error) { + if p.closed { + return nil, ErrPoolClosed + } + for { + closer, err := p.getOrCreate() + if err != nil { + return nil, err + } + // todo maxLifttime处理 + return closer, nil + } +} + +func (p *GenericPool) getOrCreate() (*rpc.Client, error) { + select { + case closer := <-p.pool: + return closer, nil + default: + } + p.Lock() + if p.numOpen < p.maxOpen { + // 新建连接 + closer, err := p.factory() + if err != nil { + p.Unlock() + return nil, err + } + p.numOpen++ + p.Unlock() + return closer, nil + } + p.Unlock() + closer := <-p.pool + return closer, nil + + //if p.numOpen >= p.maxOpen { + // //p.Unlock() + // closer := <-p.pool + // return closer, nil + //} + // 新建连接 + //closer, err := p.factory() + //if err != nil { + // //p.Unlock() + // return nil, err + //} + //p.Lock() + //p.numOpen++ + //p.Unlock() + //return closer, nil +} + +// 释放单个资源到连接池 +func (p *GenericPool) Release(closer *rpc.Client) error { + if p.closed { + return ErrPoolClosed + } + //log.Warnf("Release conn now %d idle %d", p.NumTotalConn(), p.NumIdleConn()) + p.Lock() + p.pool <- closer + p.Unlock() + return nil +} + + + +// 关闭单个资源 +func (p *GenericPool) Close(closer *rpc.Client) error { + p.Lock() + closer.Close() + p.numOpen-- + p.Unlock() + return nil +} + +func (p *GenericPool) NumTotalConn() int { + return p.numOpen +} + +func (p *GenericPool) NumIdleConn() int { + return len(p.pool) +} + + +// 关闭连接池,释放所有资源 +func (p *GenericPool) Shutdown() error { + if p.closed { + return ErrPoolClosed + } + p.Lock() + close(p.pool) + for closer := range p.pool { + closer.Close() + p.numOpen-- + } + p.closed = true + p.Unlock() + return nil +} diff --git a/rpcservice/pool/pool.go b/rpcservice/pool/pool.go index 48600b4..1faf6cb 100644 --- a/rpcservice/pool/pool.go +++ b/rpcservice/pool/pool.go @@ -2,128 +2,19 @@ package pool import ( "errors" - "io" - "sync" - "time" + "net/rpc" ) var ( - ErrInvalidConfig = errors.New("invalid pool config") - ErrPoolClosed = errors.New("pool closed") + //ErrClosed 连接池已经关闭Error + ErrClosed = errors.New("pool is closed") ) -type factory func() (io.Closer, error) - type Pool interface { - Acquire() (io.Closer, error) // 获取资源 - Release(io.Closer) error // 释放资源 - Close(io.Closer) error // 关闭资源 + Acquire() (*rpc.Client, error) // 获取资源 + Release(*rpc.Client) error // 释放资源 + Close(*rpc.Client) error // 关闭资源 Shutdown() error // 关闭池 -} - -type GenericPool struct { - sync.Mutex - pool chan io.Closer - maxOpen int // 池中最大资源数 - numOpen int // 当前池中资源数 - minOpen int // 池中最少资源数 - closed bool // 池是否已关闭 - maxLifetime time.Duration - factory factory // 创建连接的方法 -} - -func NewGenericPool(minOpen, maxOpen int, maxLifetime time.Duration, factory factory) (*GenericPool, error) { - if maxOpen <= 0 || minOpen > maxOpen { - return nil, ErrInvalidConfig - } - p := &GenericPool{ - maxOpen: maxOpen, - minOpen: minOpen, - maxLifetime: maxLifetime, - factory: factory, - pool: make(chan io.Closer, maxOpen), - } - - for i := 0; i < minOpen; i++ { - closer, err := factory() - if err != nil { - continue - } - p.numOpen++ - p.pool <- closer - } - return p, nil -} - -func (p *GenericPool) Acquire() (io.Closer, error) { - if p.closed { - return nil, ErrPoolClosed - } - for { - closer, err := p.getOrCreate() - if err != nil { - return nil, err - } - // todo maxLifttime处理 - return closer, nil - } -} - -func (p *GenericPool) getOrCreate() (io.Closer, error) { - select { - case closer := <-p.pool: - return closer, nil - default: - } - p.Lock() - if p.numOpen >= p.maxOpen { - closer := <-p.pool - p.Unlock() - return closer, nil - } - // 新建连接 - closer, err := p.factory() - if err != nil { - p.Unlock() - return nil, err - } - p.numOpen++ - p.Unlock() - return closer, nil -} - -// 释放单个资源到连接池 -func (p *GenericPool) Release(closer io.Closer) error { - if p.closed { - return ErrPoolClosed - } - p.Lock() - p.pool <- closer - p.Unlock() - return nil -} - -// 关闭单个资源 -func (p *GenericPool) Close(closer io.Closer) error { - p.Lock() - closer.Close() - p.numOpen-- - p.Unlock() - return nil -} - -// 关闭连接池,释放所有资源 -func (p *GenericPool) Shutdown() error { - if p.closed { - return ErrPoolClosed - } - p.Lock() - close(p.pool) - for closer := range p.pool { - closer.Close() - p.numOpen-- - } - p.closed = true - p.Unlock() - return nil + NumTotalConn() int + NumIdleConn() int } diff --git a/rpcservice/signaling/service.go b/rpcservice/signaling/service.go index 0144a11..a247cc1 100644 --- a/rpcservice/signaling/service.go +++ b/rpcservice/signaling/service.go @@ -5,7 +5,6 @@ import ( "cbsignal/hub" "cbsignal/rpcservice" "encoding/json" - "fmt" "github.com/lexkong/log" "net/rpc" ) @@ -23,26 +22,56 @@ func RegisterSignalService() error { func (b *Service) Signal(request rpcservice.SignalReq, reply *rpcservice.RpcResp) error { req := handler.SignalResp{} if err := json.Unmarshal(request.Data, &req);err != nil { + log.Warnf("json.Unmarshal error %s", err.Error()) return err } + + // test + //time.Sleep(3*time.Second) + log.Infof("rpc receive signal from %s to %s action %s", req.FromPeerId, request.ToPeerId, req.Action) - cli, ok := hub.GetClient(req.FromPeerId) - if !ok { - // 节点不存在 - reply.Success = false - reply.Reason = fmt.Sprintf("peer %s not found", req.FromPeerId) - } else { - reply.Success = true - signalMsg := handler.SignalMsg{ - Action: req.Action, - ToPeerId: request.ToPeerId, - Data: req.Data, - } - hdr, err := handler.NewHandlerMsg(signalMsg, cli) - if err != nil { - return err + go func() { + cli, ok := hub.GetClient(req.FromPeerId) + if !ok { + // 节点不存在 + //reply.Success = false + //reply.Reason = fmt.Sprintf("peer %s not found", req.FromPeerId) + } else { + //reply.Success = true + signalMsg := handler.SignalMsg{ + Action: req.Action, + ToPeerId: request.ToPeerId, + Data: req.Data, + } + hdr, err := handler.NewHandlerMsg(signalMsg, cli) + if err != nil { + log.Warnf("NewHandlerMsg err %s", err) + //return err + } + hdr.Handle() } - hdr.Handle() - } + }() + + //cli, ok := hub.GetClient(req.FromPeerId) + //if !ok { + // // 节点不存在 + // reply.Success = false + // reply.Reason = fmt.Sprintf("peer %s not found", req.FromPeerId) + //} else { + // reply.Success = true + // signalMsg := handler.SignalMsg{ + // Action: req.Action, + // ToPeerId: request.ToPeerId, + // Data: req.Data, + // } + // hdr, err := handler.NewHandlerMsg(signalMsg, cli) + // if err != nil { + // log.Warnf("NewHandlerMsg err %s", err) + // return err + // } + // hdr.Handle() + //} + + reply.Success = true return nil } diff --git a/util/cmap/cmap.go b/util/cmap/cmap.go index c15c5b5..650e31e 100644 --- a/util/cmap/cmap.go +++ b/util/cmap/cmap.go @@ -1,6 +1,7 @@ package cmap import ( + "cbsignal/client" "encoding/json" "sync" ) @@ -13,7 +14,7 @@ type ConcurrentMap []*ConcurrentMapShared // A "thread" safe string to anything map. type ConcurrentMapShared struct { - items map[string]interface{} + items map[string]*client.Client sync.RWMutex // Read Write mutex, guards access to internal map. } @@ -21,7 +22,7 @@ type ConcurrentMapShared struct { func New() ConcurrentMap { m := make(ConcurrentMap, SHARD_COUNT) for i := 0; i < SHARD_COUNT; i++ { - m[i] = &ConcurrentMapShared{items: make(map[string]interface{})} + m[i] = &ConcurrentMapShared{items: make(map[string]*client.Client)} } return m } @@ -31,7 +32,7 @@ func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared { return m[uint(fnv32(key))%uint(SHARD_COUNT)] } -func (m ConcurrentMap) MSet(data map[string]interface{}) { +func (m ConcurrentMap) MSet(data map[string]*client.Client) { for key, value := range data { shard := m.GetShard(key) shard.Lock() @@ -41,7 +42,7 @@ func (m ConcurrentMap) MSet(data map[string]interface{}) { } // Sets the given value under the specified key. -func (m ConcurrentMap) Set(key string, value interface{}) { +func (m ConcurrentMap) Set(key string, value *client.Client) { // Get map shard. shard := m.GetShard(key) shard.Lock() @@ -53,10 +54,10 @@ func (m ConcurrentMap) Set(key string, value interface{}) { // It is called while lock is held, therefore it MUST NOT // try to access other keys in same map, as it can lead to deadlock since // Go sync.RWLock is not reentrant -type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{} +type UpsertCb func(exist bool, valueInMap *client.Client, newValue *client.Client) *client.Client // Insert or Update - updates existing element or inserts a new one using UpsertCb -func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) { +func (m ConcurrentMap) Upsert(key string, value *client.Client, cb UpsertCb) (res *client.Client) { shard := m.GetShard(key) shard.Lock() v, ok := shard.items[key] @@ -67,7 +68,7 @@ func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res i } // Sets the given value under the specified key if no value was associated with it. -func (m ConcurrentMap) SetIfAbsent(key string, value interface{}) bool { +func (m ConcurrentMap) SetIfAbsent(key string, value *client.Client) bool { // Get map shard. shard := m.GetShard(key) shard.Lock() @@ -80,7 +81,7 @@ func (m ConcurrentMap) SetIfAbsent(key string, value interface{}) bool { } // Get retrieves an element from map under given key. -func (m ConcurrentMap) Get(key string) (interface{}, bool) { +func (m ConcurrentMap) Get(key string) (*client.Client, bool) { // Get shard shard := m.GetShard(key) shard.RLock() @@ -153,7 +154,7 @@ func (m ConcurrentMap) Remove(key string) { // RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held // If returns true, the element will be removed from the map -type RemoveCb func(key string, v interface{}, exists bool) bool +type RemoveCb func(key string, v *client.Client, exists bool) bool // RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params // If callback returns true and element exists, it will remove it from the map @@ -172,7 +173,7 @@ func (m ConcurrentMap) RemoveCb(key string, cb RemoveCb) bool { } // Pop removes an element from the map and returns it -func (m ConcurrentMap) Pop(key string) (v interface{}, exists bool) { +func (m ConcurrentMap) Pop(key string) (v *client.Client, exists bool) { // Try to get shard. shard := m.GetShard(key) shard.Lock() @@ -190,7 +191,7 @@ func (m ConcurrentMap) IsEmpty() bool { // Used by the Iter & IterBuffered functions to wrap two variables together over a channel, type Tuple struct { Key string - Val interface{} + Val *client.Client } // Iter returns an iterator which could be used in a for range loop. @@ -277,8 +278,8 @@ func fanIn(chans []chan Tuple, out chan Tuple) { } // Items returns all items as map[string]interface{} -func (m ConcurrentMap) Items() map[string]interface{} { - tmp := make(map[string]interface{}) +func (m ConcurrentMap) Items() map[string]*client.Client { + tmp := make(map[string]*client.Client) // Insert items to temporary map. for item := range m.IterBuffered() { @@ -292,7 +293,7 @@ func (m ConcurrentMap) Items() map[string]interface{} { // maps. RLock is held for all calls for a given shard // therefore callback sess consistent view of a shard, // but not across the shards -type IterCb func(key string, v interface{}) +type IterCb func(key string, v *client.Client) // Callback based iterator, cheapest way to read // all elements in a map. @@ -341,7 +342,7 @@ func (m ConcurrentMap) Keys() []string { //Reviles ConcurrentMap "private" variables to json marshal. func (m ConcurrentMap) MarshalJSON() ([]byte, error) { // Create a temporary map, which will hold all item spread across shards. - tmp := make(map[string]interface{}) + tmp := make(map[string]*client.Client) // Insert items to temporary map. for item := range m.IterBuffered() {