diff --git a/metrics/connection.go b/metrics/connection.go index c27ffab..6d60fb3 100644 --- a/metrics/connection.go +++ b/metrics/connection.go @@ -30,13 +30,6 @@ var ( Name: "send_blockings", Help: "Number of send blocking connections", }) - - countMessageFromClosed = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: promNamespace, - Subsystem: promSubsystem, - Name: "sending_on_closed", - Help: "Number of sending on closed connections", - }) ) func IncNewConnection() { @@ -51,10 +44,6 @@ func IncSendBlocking() { countSendBlocking.Inc() } -func IncMessageFromClosed() { - countMessageFromClosed.Inc() -} - func SetCurrentConnections(num int) { gaugeCurrentConnections.Set(float64(num)) } diff --git a/relay/pendingSession.go b/relay/pendingSession.go deleted file mode 100644 index bd59ce3..0000000 --- a/relay/pendingSession.go +++ /dev/null @@ -1,112 +0,0 @@ -package relay - -import ( - "sort" - "sync" - "time" -) - -type PendingSessions []*pendingSession - -// SortedPendingSessions stores pending sessions in sorted manner by expireTime, meanwhile provides an O(1) time if lookup by topic -type SortedPendingSessions struct { - sessions PendingSessions - mapping map[string]*pendingSession - mutex sync.Mutex -} - -type pendingSession struct { - expireTime time.Time - topic string -} - -func (pq PendingSessions) Len() int { return len(pq) } - -func (pq PendingSessions) Less(i, j int) bool { - return pq[i].expireTime.Before(pq[j].expireTime) -} - -func (pq PendingSessions) Swap(i, j int) { - pq[i], pq[j] = pq[j], pq[i] -} - -func (pq *PendingSessions) insert(p *pendingSession) { - // i points to the smallest index that expires before p - i := sort.Search(len(*pq), func(i int) bool { - return (*pq)[i].expireTime.Before(p.expireTime) - }) - *pq = append(*pq, &pendingSession{}) // allocate a empty value at the end - copy((*pq)[i+1:], (*pq)[i:]) - (*pq)[i] = p -} - -func (pq *PendingSessions) delete(p *pendingSession) { - // i points to the smallest index that expires earlier or same as p - i := sort.Search(len(*pq), func(i int) bool { - return !(*pq)[i].expireTime.After(p.expireTime) - }) - - target := i - for j, session := range (*pq)[i:] { - if session.expireTime.Equal((*pq)[i].expireTime) && session.topic == (*pq)[i].topic { - target = i + j - break - } - } - if target < len(*pq) { - copy((*pq)[target:], (*pq)[target+1:]) - *pq = (*pq)[:len(*pq)-1] - } -} - -func (sps *SortedPendingSessions) insert(p *pendingSession) { - sps.mutex.Lock() - defer sps.mutex.Unlock() - - if _, ok := sps.mapping[p.topic]; ok { - return - } - - sps.sessions.insert(p) - sps.mapping[p.topic] = p -} - -// deleteByTopic deletes the pending session by topic -// the return indicates whether exists the pending session identified by the topic -func (sps *SortedPendingSessions) deleteByTopic(topic string) bool { - sps.mutex.Lock() - defer sps.mutex.Unlock() - - p, ok := sps.mapping[topic] - if ok { - sps.sessions.delete(p) - } - delete(sps.mapping, topic) - - return true -} - -// peak peaks the session that will be expired earliest but ain't remove it -func (sps *SortedPendingSessions) peak() *pendingSession { - if sps.sessions.Len() < 1 { - return nil - } - return sps.sessions[sps.sessions.Len()-1] -} - -// pop pops the session that will be expired earliest -func (sps *SortedPendingSessions) pop() { - sps.mutex.Lock() - defer sps.mutex.Unlock() - - session := sps.sessions[sps.sessions.Len()-1] - sps.sessions = sps.sessions[:sps.sessions.Len()-1] - delete(sps.mapping, session.topic) -} - -func NewSortedPendingSessions() *SortedPendingSessions { - return &SortedPendingSessions{ - sessions: PendingSessions{}, - mapping: make(map[string]*pendingSession), - } -} diff --git a/relay/pendingSession_test.go b/relay/pendingSession_test.go deleted file mode 100644 index a17f6bf..0000000 --- a/relay/pendingSession_test.go +++ /dev/null @@ -1,76 +0,0 @@ -package relay - -import ( - "testing" - "time" -) - -func TestInsertPendingSession(t *testing.T) { - sps := NewSortedPendingSessions() - - session1 := &pendingSession{ - expireTime: time.Now().Add(5 * time.Second), // expire - topic: "topic1", - } - - session2 := &pendingSession{ - expireTime: time.Now().Add(4 * time.Second), // expire - topic: "topic2", - } - - session3 := &pendingSession{ - expireTime: time.Now().Add(3 * time.Second), // expire - topic: "topic3", - } - - sps.insert(session1) - sps.insert(session3) - sps.insert(session2) - - actual := sps.peak() - if actual.topic != session3.topic { - t.Errorf("unexpected top session, expected: %v, actual: %v\n", session3.topic, actual.topic) - } - - sps.pop() - actual = sps.peak() - if actual.topic != session2.topic { - t.Errorf("unexpected top session, expected: %v, actual: %v\n", session2.topic, actual.topic) - } - - sps.pop() - actual = sps.peak() - if actual.topic != session1.topic { - t.Errorf("unexpected top session, expected: %v, actual: %v\n", session1.topic, actual.topic) - } -} - -func TestDeletePendingSession(t *testing.T) { - sps := NewSortedPendingSessions() - - session1 := &pendingSession{ - expireTime: time.Now().Add(5 * time.Second), // expire - topic: "topic1", - } - - session2 := &pendingSession{ - expireTime: time.Now().Add(4 * time.Second), // expire - topic: "topic2", - } - - session3 := &pendingSession{ - expireTime: time.Now().Add(3 * time.Second), // expire - topic: "topic3", - } - - sps.insert(session1) - sps.insert(session3) - sps.insert(session2) - - sps.deleteByTopic("topic3") - - actual := sps.peak() - if actual.topic != session2.topic { - t.Errorf("unexpected top session, expected: %v, actual: %v\n", session2.topic, actual.topic) - } -} diff --git a/relay/schema.go b/relay/schema.go index 664d18b..638d612 100644 --- a/relay/schema.go +++ b/relay/schema.go @@ -1,6 +1,8 @@ package relay import ( + "crypto/rand" + "encoding/base64" "encoding/json" "strings" "sync" @@ -159,6 +161,16 @@ func NewTopicSet() *TopicSet { } } +func (tm TopicSet) Get() map[string]struct{} { + tm.Lock() + defer tm.Unlock() + newmap := make(map[string]struct{}) + for key, value := range tm.Data { + newmap[key] = value + } + return newmap +} + func (tm TopicSet) Set(topic string) { tm.Lock() defer tm.Unlock() @@ -178,3 +190,11 @@ type ClientUnregisterEvent struct { client *client reason error } + +func generateRandomBytes16() string { + buf := make([]byte, 16) + if _, err := rand.Read(buf); err != nil { + return "" + } + return base64.StdEncoding.EncodeToString(buf) +} diff --git a/relay/schema_test.go b/relay/schema_test.go index 540a522..508b7da 100644 --- a/relay/schema_test.go +++ b/relay/schema_test.go @@ -85,3 +85,27 @@ func TestGetTopicsByClientAndClear(t *testing.T) { t.Errorf("length error, expected: %v, actual: %v", expectedLen, actualLen) } } + +func TestTopicGet(t *testing.T) { + ts := NewTopicSet() + + ts.Set("hello") + ts.Set("hello1") + ts.Set("hello2") + + topics := ts.Get() + actualLen := len(topics) + expectedLen := 3 + if actualLen != expectedLen { + t.Errorf("length error, expected: %v, actual: %v", expectedLen, actualLen) + } + if _, ok := topics["hello"]; !ok { + t.Errorf("key does not exists") + } + if _, ok := topics["hello1"]; !ok { + t.Errorf("key does not exists") + } + if _, ok := topics["hello2"]; !ok { + t.Errorf("key does not exists") + } +} diff --git a/relay/util.go b/relay/util.go deleted file mode 100644 index 1642081..0000000 --- a/relay/util.go +++ /dev/null @@ -1,14 +0,0 @@ -package relay - -import ( - "crypto/rand" - "encoding/base64" -) - -func generateRandomBytes16() string { - buf := make([]byte, 16) - if _, err := rand.Read(buf); err != nil { - return "" - } - return base64.StdEncoding.EncodeToString(buf) -} diff --git a/relay/wsconn.go b/relay/wsconn.go index 4935acb..6ccdd2b 100644 --- a/relay/wsconn.go +++ b/relay/wsconn.go @@ -3,7 +3,6 @@ package relay import ( "bytes" "encoding/json" - "fmt" "strings" "github.com/RabbyHub/derelay/log" @@ -19,7 +18,6 @@ type client struct { id string // randomly generate, just for logging role RoleType // dapp or wallet - session string // session id pubTopics *TopicSet subTopics *TopicSet @@ -31,7 +29,6 @@ func (c *client) MarshalLogObject(encoder zapcore.ObjectEncoder) error { if c != nil { encoder.AddString("id", c.id) encoder.AddString("role", string(c.role)) - encoder.AddString("session", string(c.session)) encoder.AddArray("pubTopics", c.pubTopics) encoder.AddArray("subTopics", c.subTopics) } @@ -87,7 +84,6 @@ func (c *client) send(message SocketMessage) { case c.sendbuf <- message: default: metrics.IncSendBlocking() - log.Error("client sendbuf full", fmt.Errorf(""), zap.Any("client", c), zap.Any("len(sendbuf)", len(c.sendbuf)), zap.Any("message", message)) } } diff --git a/relay/wshandlers.go b/relay/wshandlers.go index 5727c12..b6ebc7f 100644 --- a/relay/wshandlers.go +++ b/relay/wshandlers.go @@ -24,12 +24,12 @@ func (ws *WsServer) pubMessage(message SocketMessage) { } } - log.Info("publish message", zap.Any("client", publisher), zap.Any("topic", message.Topic)) + log.Debug("publish message", zap.Any("client", publisher), zap.Any("topic", message.Topic)) metrics.IncTotalMessages() key := messageChanKey(topic) if count, _ := ws.redisConn.Publish(context.TODO(), key, message).Result(); count >= 1 { - log.Info("message published", zap.Any("client", publisher), zap.Any("topic", topic)) + log.Debug("message published", zap.Any("client", publisher), zap.Any("topic", topic)) if publisher.role == Dapp { publisher.send(SocketMessage{ Topic: message.Topic, @@ -38,7 +38,7 @@ func (ws *WsServer) pubMessage(message SocketMessage) { }) } } else { - log.Info("cache message", zap.Any("client", publisher), zap.Any("topic", topic)) + log.Debug("cache message", zap.Any("client", publisher), zap.Any("topic", topic)) metrics.IncCachedMessages() if message.Phase == string(SessionRequest) { metrics.IncNewRequestedSessions() @@ -54,11 +54,11 @@ func (ws *WsServer) subMessage(message SocketMessage) { if err := ws.redisSubConn.Subscribe(context.TODO(), messageChanKey(topic)); err != nil { log.Warn("[redisSub] subscribe to topic fail", zap.String("topic", topic), zap.Any("client", subscriber)) } - log.Info("subscribe to topic", zap.String("topic", topic), zap.Any("client", subscriber)) + log.Debug("subscribe to topic", zap.String("topic", topic), zap.Any("client", subscriber)) // forward cached notificatoins if there's any notifications := ws.getCachedMessages(topic, true) - log.Info("pending notifications", zap.String("topic", topic), zap.Any("num", len(notifications)), zap.Any("client", subscriber)) + log.Debug("pending notifications", zap.String("topic", topic), zap.Any("num", len(notifications)), zap.Any("client", subscriber)) for _, notification := range notifications { subscriber.send(notification) } @@ -82,9 +82,8 @@ func (ws *WsServer) subMessage(message SocketMessage) { // messages from wallet if noti.Phase == string(SessionRequest) { // handle the 1st case stated above - subscriber.session = noti.Topic metrics.IncReceivedSessions() - log.Info("session been scanned", zap.Any("topic", topic), zap.Any("client", subscriber)) + log.Debug("session been scanned", zap.Any("topic", topic), zap.Any("client", subscriber)) // notify the topic publisher, aka the dapp, that the session request has been received by wallet key := dappNotifyChanKey(noti.Topic) @@ -136,24 +135,24 @@ func (ws *WsServer) handleClientDisconnect(client *client) { // clear the client from the subscribed and published topics channelsToClear := []string{} - subscribedTopics := ws.subscribers.GetTopicsByClient(client, true) + //subscribedTopics := ws.subscribers.GetTopicsByClient(client, true) + subscribedTopics := client.subTopics.Get() - for _, topic := range subscribedTopics { - count := len(ws.subscribers.Get(topic)) - if count == 0 { + for topic := range subscribedTopics { + ws.subscribers.Unset(topic, client) + if ws.subscribers.Len(topic) == 0 { channelsToClear = append(channelsToClear, messageChanKey(topic)) } } - - if client.role == Dapp { - // clear dapp notify channels - publishedChannels := []string{} - for _, topic := range ws.publishers.GetTopicsByClient(client, true) { - if len(ws.publishers.Get(topic)) == 0 { - publishedChannels = append(publishedChannels, dappNotifyChanKey(topic)) + for topic := range client.pubTopics.Get() { + ws.publishers.Unset(topic, client) + if ws.publishers.Len(topic) == 0 { + channelsToClear = append(channelsToClear, messageChanKey(topic)) + // for dapp, need to further clear notify channels + if client.role == Dapp { + channelsToClear = append(channelsToClear, dappNotifyChanKey(topic)) } } - channelsToClear = append(channelsToClear, publishedChannels...) } if len(channelsToClear) > 0 { @@ -168,7 +167,7 @@ func (ws *WsServer) handleClientDisconnect(client *client) { if client.role == Dapp { return } - for _, topic := range subscribedTopics { + for topic := range subscribedTopics { go func(topic string) { key := dappNotifyChanKey(topic) ws.redisConn.Publish(context.TODO(), key, SocketMessage{ diff --git a/relay/wsserver.go b/relay/wsserver.go index 2e84c6f..d4cfb91 100644 --- a/relay/wsserver.go +++ b/relay/wsserver.go @@ -72,7 +72,7 @@ func (ws *WsServer) NewClientConn(w http.ResponseWriter, r *http.Request) { ws: ws, pubTopics: NewTopicSet(), subTopics: NewTopicSet(), - sendbuf: make(chan SocketMessage, 256), + sendbuf: make(chan SocketMessage, 8), quit: make(chan struct{}), } @@ -80,7 +80,6 @@ func (ws *WsServer) NewClientConn(w http.ResponseWriter, r *http.Request) { go client.read() go client.write() - //go client.heartbeat() } func (ws *WsServer) Run() { @@ -91,9 +90,6 @@ func (ws *WsServer) Run() { for { select { case message := <-ws.localCh: - if _, ok := ws.clients[message.client]; !ok { - metrics.IncMessageFromClosed() - } // local message could be "pub", "sub" or "ack" or "ping" // pub/sub message handler may contain time-consuming operations(e.g. read/write redis) // so put them in separate goroutine to avoid blocking wsserver main loop @@ -138,16 +134,11 @@ func (ws *WsServer) Run() { // * SessionResumed // * relay generated fake "ack" for the wallet for _, publisher := range ws.GetDappPublisher(message.Topic) { - log.Info("wallet updates, notify dapp", zap.Any("client", publisher), zap.Any("message", message)) - // if SessionReceived, the message topic is the QRCode topic, use it as the session id - if message.Phase == string(SessionReceived) { - publisher.session = message.Topic - } + log.Debug("wallet updates, notify dapp", zap.Any("client", publisher), zap.Any("message", message)) publisher.send(message) } case client := <-ws.register: - log.Info("new client connection", zap.Any("client", client)) metrics.IncNewConnection() ws.clients[client] = struct{}{} metrics.SetCurrentConnections(len(ws.clients))